root/vtcross/trunk/src/service_management_layer/ServiceManagementLayer.cpp @ 479

Revision 479, 63.7 KB (checked in by bhilburn, 15 years ago)

A few changes to the SML which might be necessary for the sql code to
work properly.

Line 
1/*
2 Copyright 2009 Virginia Polytechnic Institute and State University 
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7 
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15*/
16
17/* Inter-component communication handled by sockets and FD's. 
18 * Server support has been completely implemented and tested.
19 *
20 * Services are stored in a SQLite DB by the ID of the CE that registered them.  Service
21 * support has been completely implemented and tested.
22 *
23 * Missions are loaded from an XML file, connected with services provided by components,
24 * and run.  See the documentation for the "PerformActiveMission" below for important
25 * info.
26 */
27
28
29#include <cmath>
30#include <cstdio>
31#include <cstdlib>
32#include <cstring>
33#include <stdint.h>
34
35#include <arpa/inet.h>
36#include <iostream>
37#include <netinet/in.h>
38#include <netdb.h>
39#include <fcntl.h>
40#include <sqlite3.h>
41#include <string>
42#include <sys/ioctl.h>
43#include <sys/mman.h>
44#include <sys/socket.h>
45#include <sys/types.h>
46#include <sys/wait.h>
47
48#include "tinyxml/tinyxml.h"
49#include "tinyxml/tinystr.h"
50
51#include "vtcross/debug.h"
52#include "vtcross/error.h"
53#include "vtcross/common.h"
54#include "vtcross/containers.h"
55#include "vtcross/service_management_layer.h"
56#include "vtcross/socketcomm.h"
57
58
59using namespace std;
60
61
62/* Internal structs used to keep track of sqlite database information. */
63struct services_s {
64    string filename;
65    string tablename;
66    string command;
67    sqlite3 *db;
68    unsigned int num_columns;
69};
70
71struct data_s {
72    string filename;
73    string tablename;
74    string command;
75    sqlite3 *db;
76    unsigned int num_columns;
77};
78
79typedef struct services_s *services_DB_t;
80typedef struct data_s *data_DB_t;
81
82/* Global tracking of the services and parameters that this instance of SML is
83 * aware of. */
84services_DB_t services_DB;
85data_DB_t data_DB;
86
87/* The filename of the SML configuration, used if the SML is told to reload it's
88 * configuration. */
89string _SML_Config;
90
91/* Keeps track of whether or not the 'shell' component is used by the active
92 * mission. */
93bool shellUsed;
94
95
96/* Callback function used internally by some of the SQLite3 commands for debug
97 * output. Code provided by sqlite documentation. */
98int32_t
99callback(void *notUsed, int32_t argc, char **argv, char **azColName)
100{
101    for(size_t i = 0; i < argc; i++) {
102        LOG("%s = %s\n", azColName[i], argv[i] ? argv[i] : "NULL");
103    }
104
105    LOG("\n");
106    return 0;
107}
108
109
110ServiceManagementLayer::ServiceManagementLayer()
111{
112    LOG("Creating Service Management Layer.\n");
113
114    shellSocketFD = -1;
115    numberOfCognitiveEngines = 0;
116    CE_Present = false;
117    cogEngSrv = 1;
118}
119
120
121/* Free and clear the DB's associated with this SML in the destructor.
122 *
123 * Note that exiting with an error condition will cause SML to not be destructed,
124 * resulting in the DB's staying in memory until the destructor is encountered in
125 * future executions. */
126ServiceManagementLayer::~ServiceManagementLayer()
127{
128    char *errorMsg;
129    int32_t rc;         /* sqlite command return code */
130
131    services_DB->command = "drop table ";
132    services_DB->command.append(services_DB->tablename);
133    rc = sqlite3_exec(services_DB->db, services_DB->command.c_str(), callback, 0, &errorMsg);
134    if((rc != SQLITE_OK) && (rc != 101))
135        WARNING("ServiceManagementLayer::Destructor services 'drop table' error: %s\n", errorMsg);
136
137    services_DB->command = "vacuum";
138    rc = sqlite3_exec(services_DB->db, services_DB->command.c_str(), callback, 0, &errorMsg);
139    if((rc != SQLITE_OK) && (rc != 101))
140        WARNING("ServiceManagementLayer::Destructor services 'vacuum' error: %s\n", errorMsg);
141
142    free(services_DB);
143
144    data_DB->command = "drop table ";
145    data_DB->command.append(data_DB->tablename);
146    rc = sqlite3_exec(data_DB->db, data_DB->command.c_str(), callback, 0, &errorMsg);
147    if((rc != SQLITE_OK) && (rc != 101))
148        WARNING("ServiceManagementLayer::Destructor data 'drop table' error: %s\n", errorMsg);
149
150    data_DB->command = "vacuum";
151    rc = sqlite3_exec(data_DB->db, data_DB->command.c_str(), callback, 0, &errorMsg);
152    if((rc != SQLITE_OK) && (rc != 101))
153        WARNING("ServiceManagementLayer::Destructor data 'vacuum' error: %s\n", errorMsg);
154
155    free(data_DB);
156}
157
158
159/* Note that sizes of CE_List, miss, and service are hardcoded for now.
160 * Also, their sizes are hardcoded into the code in various places; a fix
161 * for a future version. */
162ServiceManagementLayer::ServiceManagementLayer(const char* SML_Config, \
163    const char* serverName, const char* serverPort, int16_t clientPort)
164{
165    LOG("Creating Service Management Layer.\n");
166
167    _SML_Config = string(SML_Config);
168    SMLport = clientPort;
169
170    ConnectToShell(serverName, serverPort);
171    CE_List = new CE_Reg[10];
172
173    miss = new Mission[10];
174    for(size_t i = 0; i < 10; i++) {
175        miss[i].services = new Service[30];
176    }
177
178    Current_ID = 0;
179
180    LoadConfiguration(SML_Config, miss);
181
182    CreateServicesDB();
183    CreateDataDB();
184}
185
186
187/* CALLED BY: constructor
188 * INPUTS: <none>
189 * OUTPUTS: <none>
190 *
191 * DESCRIPTION: Create and initialize a DB to hold the services registered by components
192 */
193void
194ServiceManagementLayer::CreateServicesDB()
195{
196    sqlite3_stmt *ppStmt;   /* OUT: Statement handle */
197    const char *pzTail;     /* OUT: Pointer to unused portion of zSql */
198    int32_t rc;             /* sqlite command return code */
199
200    services_DB = new services_s;
201    services_DB->filename="Services_Table";
202    sqlite3_open(services_DB->filename.c_str(), &(services_DB->db));
203
204    services_DB->tablename="Services";
205
206    /* If program execution ends in anything other than a ordered shutdown, DB's will still
207     * be there for next run. Need to get rid of it so that old data isn't inadvertantly
208     * used in the next execution cycle. */
209    services_DB->command = "DROP TABLE if EXISTS Services;";     
210
211    rc = sqlite3_prepare_v2(services_DB->db, services_DB->command.c_str(), 128, &ppStmt, &pzTail);
212    if((rc != SQLITE_OK) && (rc != 101))
213        WARNING("ServiceManagementLayer::CreateServicesDB 'prepare_stmt' error %d\n", rc);
214
215    rc = sqlite3_step(ppStmt);
216    if((rc != SQLITE_OK) && (rc != 101))
217        WARNING("ServiceManagementLayer::CreateServicesDB 'step' error\n");
218
219    services_DB->num_columns = 2;
220
221    /* Generate command */
222    services_DB->command="CREATE TABLE ";
223    services_DB->command.append(services_DB->tablename);
224    services_DB->command.append("(ID_Num INT, Service_Name TEXT);");
225
226    /* Execute create table command */
227    rc = sqlite3_prepare_v2(services_DB->db, services_DB->command.c_str(), 128, &ppStmt, &pzTail);
228    if((rc != SQLITE_OK) && (rc != 101))
229        WARNING("ServiceManagementLayer::CreateServicesDB 'prepare_stmt' error %d\n", rc);
230
231    rc = sqlite3_step(ppStmt);
232    if((rc != SQLITE_OK) && (rc != 101))
233        WARNING("ServiceManagementLayer::CreateServicesDB 'step' error\n");
234}
235
236
237/* CALLED BY: constructor
238 * INPUTS: <none>
239 * OUTPUTS: <none>
240 *
241 * DESCRIPTION: Create and initialize a DB to hold the data sent by components
242 */
243void
244ServiceManagementLayer::CreateDataDB()
245{
246    sqlite3_stmt *ppStmt;   /* OUT: Statement handle */
247    const char *pzTail;     /* OUT: Pointer to unused portion of zSql */
248    int32_t rc;             /* sqlite command return code */
249
250    data_DB = new data_s;
251
252    data_DB->filename="Data_Table";
253    sqlite3_open(data_DB->filename.c_str(), &(data_DB->db));
254
255    data_DB->tablename = "Data";
256    data_DB->command = "DROP TABLE if EXISTS Data;";     
257
258    rc = sqlite3_prepare_v2(data_DB->db, data_DB->command.c_str(), 128, &ppStmt, &pzTail);
259    if((rc != SQLITE_OK) && (rc != 101))
260        WARNING("ServiceManagementLayer::CreateDataDB 'prepare_stmt' error %d\n", rc);
261
262    rc = sqlite3_step(ppStmt);
263    if((rc != SQLITE_OK) && (rc != 101))
264        WARNING("ServiceManagementLayer::CreateDataDB 'step' error\n");
265
266    data_DB->num_columns = 2;
267
268    /* First column is the name of the data (corresponding to the name of the output/input pair)
269     * It is the primary key so any subsequent data with the same name will replace the row. */
270    data_DB->command = "CREATE TABLE ";
271    data_DB->command.append(data_DB->tablename);
272    data_DB->command.append("(Tag TEXT PRIMARY KEY ON CONFLICT REPLACE, Data TEXT);");
273
274    rc = sqlite3_prepare_v2(data_DB->db, data_DB->command.c_str(), 128, &ppStmt, &pzTail);
275    if((rc != SQLITE_OK) && (rc != 101))
276        WARNING("ServiceManagementLayer::CreateDataDB 'prepare_stmt' error %d\n", rc);
277
278    rc = sqlite3_step(ppStmt);
279    if((rc != SQLITE_OK) && (rc != 101))
280        WARNING("ServiceManagementLayer::CreateDataDB 'step' error\n");
281}
282
283
284/* CALLED BY: MessageHandler
285 * INPUTS: <none>
286 * OUTPUTS: <none>
287 *
288 * DESCRIPTION: Sends a message identifying this component as an SML to the Shell
289 */
290void
291ServiceManagementLayer::SendComponentType()
292{
293    SendMessage(shellSocketFD, "response_sml");
294    LOG("SML responded to GetRemoteComponentType query.\n");
295}
296
297
298/* CALLED BY: constructor
299 * INPUTS: |serverName| the IPv4 name of the server (127.0.0.1 for localhost)
300 *         |serverPort| the port on the server to connect to
301 * OUTPUTS: <none>
302 *
303 * DESCRIPTION: Connecting to the shell takes 2 steps
304 * 1) Establish a client socket for communication
305 * 2) Run the initial Registration/handshake routine
306 */
307void
308ServiceManagementLayer::ConnectToShell(const char* serverName, \
309        const char* serverPort)
310{
311    shellSocketFD = ClientSocket(serverName, serverPort);
312    RegisterComponent();
313}
314
315
316/* CALLED BY: StartSMLServer
317 * INPUTS: |ID| The ID number of the CE that has a message wating
318 * OUTPUTS: <none>
319 *
320 * DESCRIPTION: Called whenever a socket is identified as being ready for communication
321 *              This funciton reads the message and calls the appropriate helper
322 */
323void
324ServiceManagementLayer::MessageHandler(int32_t ID)
325{
326    char buffer[256];   
327    memset(buffer, 0, 256); 
328    int32_t _FD; 
329   
330    if(ID != -1)
331        _FD = CE_List[ID].FD;
332    else
333        _FD = shellSocketFD;
334
335    ReadMessage(_FD, buffer);
336   
337    /* Go down the list to call the appropriate function */
338    if(strcmp(buffer, "query_component_type") == 0) {
339        SendComponentType();
340    }
341    else if(strcmp(buffer, "reset_sml") == 0) {
342        Reset();
343    }
344    else if(strcmp(buffer, "shutdown_sml") == 0) {
345        Shutdown();
346    }
347    else if(strcmp(buffer, "register_engine_cognitive") == 0) {
348        RegisterCognitiveEngine(ID);
349    }
350    else if(strcmp(buffer, "register_service") == 0) {
351        ReceiveServices(ID);
352    }
353    else if(strcmp(buffer, "send_component_type") == 0) {
354        SendComponentType();
355    }
356    else if(strcmp(buffer, "list_services") == 0) {
357        ListServices();
358    }
359    else if(strcmp(buffer, "set_active_mission") == 0) {
360        SetActiveMission();
361    }
362    else if(strcmp(buffer, "request_optimization") == 0) {
363        PerformActiveMission();
364    }
365    else if(strcmp(buffer, "deregister_engine_cognitive") == 0) {
366        DeregisterCognitiveEngine(ID);
367    }
368    else if(strcmp(buffer, "deregister_service") == 0) {
369        DeregisterServices(ID);
370    }
371}
372
373
374/* CALLED BY: MessageHandler
375 * INPUTS: <none>
376 * OUTPUTS: <none>
377 *
378 * DESCRIPTION: Deregisters the component from the Shell.
379 */
380void
381ServiceManagementLayer::Shutdown()
382{
383    DeregisterComponent();
384}
385
386
387/* CALLED BY: MessageHandler
388 * INPUTS: <none>
389 * OUTPUTS: <none>
390 *
391 * DESCRIPTION: Deregisters the component from the Shell
392 */
393void
394ServiceManagementLayer::Reset()
395{
396    DeregisterComponent();
397    ReloadConfiguration();
398}
399
400
401/* CALLED BY: ConnectToShell
402 * INPUTS: <none>
403 * OUTPUTS: <none>
404 *
405 * DESCRIPTION: Sends the registration message to the Shell
406 */
407void
408ServiceManagementLayer::RegisterComponent()
409{
410    SendMessage(shellSocketFD, "register_sml");
411    LOG("ServiceManagementLayer:: Registration message sent.\n");
412}
413
414
415/* CALLED BY: Shutdown
416 * INPUTS: <none>
417 * OUTPUTS: <none>
418 *
419 * DESCRIPTION: Closes the client socket with the shell, sends a deregstration message
420 */
421void
422ServiceManagementLayer::DeregisterComponent()
423{
424    SendMessage(shellSocketFD, "deregister_sml");
425    LOG("ServiceManagementLayer:: Deregistration message sent.\n");
426
427    shutdown(shellSocketFD, 2);
428    close(shellSocketFD);
429    shellSocketFD = -1;
430    LOG("ServiceManagementLayer:: Shell socket closed.\n");
431}
432
433
434/* CALLED BY: RegisterCognitiveEngine
435 * INPUTS: |ID| The ID number of the component where the data is to be transfered to
436 * OUTPUTS: <none>
437 *
438 * DESCRIPTION: Streams config data directly from the shell to the CE, and checks
439 * for an "ack" message from the CE after every sent message
440 * to know when to stop communication.
441 *
442 * NOTE: Modified to check the incoming message buffer rather than the outgoing
443 * message buffer to avoid a portion of the delay. May change this again to handle
444 * data more inteligently, taking advantage of it's properties.
445 */
446void
447ServiceManagementLayer::TransferRadioConfiguration(int32_t ID)
448{
449    struct timeval selTimeout;
450    fd_set sockSet;
451    int32_t rc = 1;
452    char buffer[256];
453
454    /* Send data until the CE sends an ACK message back */
455    while(rc != 0) {
456        memset(buffer, 0, 256);
457
458        /* Receive data from Shell */
459        ReadMessage(shellSocketFD, buffer);
460
461        /* Send data to CE */
462        SendMessage(CE_List[ID].FD, buffer);
463        FD_ZERO(&sockSet);
464        FD_SET(shellSocketFD, &sockSet);
465        selTimeout.tv_sec = 0;
466        selTimeout.tv_usec = 5000;
467
468        /* Check ifthere is a message on the shell ready to be processed */
469        rc = select(shellSocketFD + 1, &sockSet, NULL, NULL, &selTimeout);
470    }
471
472    memset(buffer, 0, 256);
473    ReadMessage(CE_List[ID].FD, buffer);
474    SendMessage(shellSocketFD, buffer);
475}
476
477
478/* CALLED BY: RegisterCognitiveEngine
479 * INPUTS: |ID| The ID number of the component where the data is to be transfered to
480 * OUTPUTS: <none>
481 *
482 * DESCRIPTION: Simmilar to TransferRadioConfig, just with Experience data
483 *
484 * NOTE: Modified to check the incoming message buffer rather than the outgoing
485 * message buffer to avoid a portion of the delay. May change this again to handle
486 * data more inteligently, taking advantage of it's properties.
487 */
488void
489ServiceManagementLayer::TransferExperience(int32_t ID)
490{
491    struct timeval selTimeout;
492    fd_set sockSet;
493    int32_t rc = 1;
494    char buffer[256];
495    /* Send data until the CE sends an ACK message back */
496    while(rc != 0) {
497        memset(buffer, 0, 256);
498
499        /* Receive data from Shell */
500        ReadMessage(shellSocketFD, buffer);
501
502        /* Send data to CE */
503        SendMessage(CE_List[ID].FD, buffer);
504        FD_ZERO(&sockSet);
505        FD_SET(shellSocketFD, &sockSet);
506        selTimeout.tv_sec = 0;
507        selTimeout.tv_usec = 5000;
508
509        /* Check ifthere is a message on the shell ready to be processed */
510        rc = select(shellSocketFD + 1, &sockSet, NULL, NULL, &selTimeout);
511    }
512
513    memset(buffer, 0, 256);
514    ReadMessage(CE_List[ID].FD, buffer);
515    SendMessage(shellSocketFD, buffer);
516}
517
518
519/* CALLED BY: MessageHandler
520 * INPUTS: |ID| The ID number of the component where service is located
521 * OUTPUTS: <none>
522 *
523 * DESCRIPTION: Inserts a service into the DB with the ID of the component where it exists
524 */
525void
526ServiceManagementLayer::ReceiveServices(int32_t ID)
527{
528    char buffer[256];
529    memset(buffer, 0, 256);
530    ReadMessage(CE_List[ID].FD, buffer);
531
532    /* Generate command */
533    services_DB->command = "insert into ";
534    services_DB->command.append(services_DB->tablename);
535    services_DB->command.append(" (ID_Num, Service_Name) values(");
536
537    char temp[3];
538    memset(temp, 0, 3);
539    sprintf(temp, "%d", ID);
540
541    services_DB->command.append(temp);
542    services_DB->command.append(", '");
543    services_DB->command.append(buffer);
544    services_DB->command.append("');");
545   
546    /* Execute add command */
547    char *errorMsg;
548    int rc = sqlite3_exec(services_DB->db, services_DB->command.c_str(), callback, 0, &errorMsg);
549    if((rc != SQLITE_OK) && (rc != 101))
550        WARNING("ServiceManagementLayer::RecieveServices DB Error %s\n", errorMsg);
551}
552
553
554/* CALLED BY: MessageHandler
555 * INPUTS: <none>
556 * OUTPUTS: <none>
557 *
558 * DESCRIPTION: This method associates the services that components provide with the
559 * services that are requested in the mission. Each service in the mission is given
560 * the ID and FD of a component that has registered to provide that service. Deregistration
561 * is okay until this method is called without a reload, but ifderegistration occurs after this
562 * method is called it needs to be called again even ifother engines also provide the services
563 */
564void
565ServiceManagementLayer::SetActiveMission()
566{
567    char buffer[256];
568    memset(buffer, 0, 256);
569    ReadMessage(shellSocketFD, buffer);
570
571    uint32_t missID = atoi(buffer);
572    for(activeMission = 0; activeMission < 10; activeMission++) {
573        /* Find the active mission by comparing mission ID's */
574        if(miss[activeMission].missionID == missID)
575            break;
576    }
577
578    LOG("ServiceManagementLayer:: Received Set Active Mission command: %i.\n", missID);
579
580    /* For each service in the mission */
581    for(size_t i = 0; i < miss[activeMission].numServices; i++) {
582        /* Check whether the current service is an actual service or a conditional */
583        if(miss[activeMission].services[i].name.compare("if") && \
584                miss[activeMission].services[i].name.compare("dowhile") && \
585                miss[activeMission].services[i].name.compare("shell")) {
586                /* ifit is a service, search the database of registered services to find
587                 * the ID of the component that registered it */
588                services_DB->command="select ";
589                services_DB->command.append(services_DB->tablename);
590                services_DB->command.append(".* from ");
591                services_DB->command.append( services_DB->tablename);
592                services_DB->command.append(" where Service_Name=='");
593                services_DB->command.append(miss[activeMission].services[i].name);
594                services_DB->command.append("';");
595   
596                sqlite3_stmt * pStatement;
597                int32_t rc = sqlite3_prepare_v2(services_DB->db, services_DB->command.c_str(), \
598                        -1, &pStatement, NULL);
599                if(rc == SQLITE_OK) {
600                    if(sqlite3_step(pStatement) == SQLITE_ROW)
601                        miss[activeMission].services[i].componentID = sqlite3_column_int(pStatement, 0);
602                    else {
603                        WARNING("services_DB:: Mission requires service %s ", \
604                                miss[activeMission].services[i].name.c_str());
605                        WARNING("not provided by any connected component.\n");
606                        rc = 31337;
607                    }
608                } else {
609                    WARNING("services_DB:: Error executing SQL statement. rc = %i\n%s\n", \
610                            rc, services_DB->command.c_str());
611                }
612
613                sqlite3_finalize(pStatement);
614                miss[activeMission].services[i].socketFD = \
615                    CE_List[miss[activeMission].services[i].componentID].FD;
616        }
617        /* TODO Nothing to be done for conditionals at this stage */
618    }
619 
620    SendMessage(shellSocketFD, "ack");
621    LOG("ServiceManagementLayer:: Done setting active mission.\n");
622}
623
624
625/* CALLED BY: PerformActiveMission
626 * INPUTS: |sourceID| ID of the service that is being processed
627 * OUTPUTS: <none>
628 *
629 * DESCRIPTION: This is a helper method for the "PerformActiveMission" function
630 * NOTE: This function has changed drastically from the previous implementation
631 *
632 * Takes an ID of a service. For that service, finds inputs in DB and forwords
633 * those on to the engine after sending comm-starting messages. Afterwords, listenes
634 * for the outputs so that it can store those in the database for future services or
635 * the overall output
636 */
637void
638ServiceManagementLayer::TransactData(int32_t sourceID)
639{
640    char buffer[256];
641    std::string data;
642    char *token;
643
644   /* Send a message directly to the shell */
645   if(miss[activeMission].services[sourceID].name.find("shell") != string::npos) {
646       shellUsed=true;
647
648       int32_t k = 0;
649       while((k < 10) && (!miss[activeMission].input[k].empty())) {
650           k++;
651       }
652
653       sprintf(buffer, "%d", k);
654       SendMessage(shellSocketFD, buffer);
655       for(int32_t t = 0; t < k; t++) {
656           memset(buffer, 0 , 256);
657           data_DB->command="select ";
658           data_DB->command.append(data_DB->tablename);
659           data_DB->command.append(".* from ");
660           data_DB->command.append(data_DB->tablename);
661           data_DB->command.append(" where Tag=='");
662           data_DB->command.append(miss[activeMission].input[t]);
663           data_DB->command.append("';");
664           sqlite3_stmt * pStatement;
665
666           int32_t rc = sqlite3_prepare_v2(data_DB->db, data_DB->command.c_str(), \
667                   -1, &pStatement, NULL);
668           if(rc == SQLITE_OK) {
669               if(sqlite3_step(pStatement) == SQLITE_ROW)
670                   data=((const char*) sqlite3_column_text(pStatement, 1));
671               else {
672                   LOG("3data_DB:: Data not yet in DB., %s\n", data_DB->command.c_str());
673                   rc = 31337;
674               }
675           }
676           else {
677               WARNING("data_DB:: Error executing SQL statement. rc = %i\n%s\n", \
678                       rc,data_DB->command.c_str());
679           }
680
681           sqlite3_finalize(pStatement);
682           token = strtok((char *) data.c_str(), "@");
683           token = strtok(NULL, "@");
684           SendMessage(shellSocketFD, token);
685           token = strtok(NULL, "@");
686           SendMessage(shellSocketFD, token);
687       }
688
689       return;
690   }
691
692    /* ifthis is a service command and not a shell command... */
693    /* Transmission starting messages */
694    SendMessage(miss[activeMission].services[sourceID].socketFD, "request_optimization_service");
695    SendMessage(miss[activeMission].services[sourceID].socketFD, \
696            miss[activeMission].services[sourceID].name.c_str());
697}
698
699/* CALLED BY: MessageHandler
700 * INPUTS: <none>
701 * OUTPUTS: <none>
702 *
703 * DESCRIPTION: This function works by first sending the inputs from the shell to
704 * the appropriate components. The first service should begin immeadiately, as
705 * should any others who have all of their input parameters. When they complete,
706 * the output path is found and the data is transfered as it becomes available
707 * Presumably at this point the second function has all of it's parameters, so it
708 * begins to compute, and the cycle repeats.
709 *
710 * Rules for active missions (currently)
711 * -Five inputs/outputs per service and per mission
712 * -All ordering constraints have been relaxed in this version; all data is stored
713 *  locally and only sent when requested
714 * -ifand while support fully implemented - up to three levels (if's can be nested, but dowhiles cannot)
715 * -For dowhiles, assumes loop condition determined on last line
716 *
717 * -IMPORTANT: DB uses '@' to seperate individual statements; using '@' in the data
718 *  stream will result in incorrect behavior
719 */
720void
721ServiceManagementLayer::PerformActiveMission()
722{
723    shellUsed = false;
724    std::string data_param;
725    std::string data_obsv;
726    std::string data;
727    std::string input;
728    std::string check;
729
730    char buffer[256];
731    char buffer1[256];
732    std::string token, token2;
733    std::string data2;
734
735    int32_t rc;
736    char *errorMsg;
737
738    LOG("ServiceManagementLayer:: Received PerformActiveMission command.\n");
739
740    /* Get the inputs */
741    memset(buffer, 0, 256);
742    ReadMessage(shellSocketFD, buffer);
743
744    /* Receive Set of Parameters */
745    memset(buffer, 0, 256);
746    ReadMessage(shellSocketFD, buffer);
747    int32_t t = atoi(buffer);
748    for(size_t m = 0; m < t; m++) {
749        memset(buffer1, 0, 256);
750        ReadMessage(shellSocketFD, buffer1);
751        data_DB->command="insert into ";
752        data_DB->command.append(data_DB->tablename);
753        data_DB->command.append(" (Tag, Data) ");
754
755        memset(buffer, 0, 256);
756        ReadMessage(shellSocketFD, buffer);
757        data_DB->command.append(" values('");
758        data_DB->command.append(buffer1);
759        data_DB->command.append("', '1@");
760        data_DB->command.append(buffer1);
761        data_DB->command.append("@");
762        data_DB->command.append(buffer);
763        data_DB->command.append("');");
764
765        rc = sqlite3_exec(data_DB->db, data_DB->command.c_str(), callback, 0, &errorMsg);
766        if((rc != SQLITE_OK) && (rc != 101))
767           WARNING("SQL error: %s\n", errorMsg);
768    }
769
770    int32_t numstatements[3] = {0 ,0 ,0};
771    for(size_t i; i < miss[activeMission].numServices; i++) {
772        if(miss[activeMission].services[i].name.compare("if") == 0) {
773            input.clear();
774            check.clear();
775
776            for(size_t t = 0; t < 10; t++) {
777                if(!miss[activeMission].services[i].output[t].empty()) {
778                    input = miss[activeMission].services[i - numstatements[0] - 1].output[t];
779
780                    data_DB->command="SELECT ";
781                    data_DB->command.append(data_DB->tablename);
782                    data_DB->command.append(".* from ");
783                    data_DB->command.append(data_DB->tablename);
784                    data_DB->command.append(" where Tag=='");
785                    data_DB->command.append(input);
786                    data_DB->command.append("';");
787
788                    sqlite3_stmt *pStatement;
789                    rc = sqlite3_prepare_v2(data_DB->db, data_DB->command.c_str(), \
790                            -1, &pStatement, NULL);
791                    if(rc == SQLITE_OK) {
792                        if(sqlite3_step(pStatement) == SQLITE_ROW)
793                             data = (const char *) sqlite3_column_text(pStatement, 1);
794                        else {
795                            WARNING("1 data_DB:: Data not yet in DB.\n");
796                            rc=31337;
797                        }
798                    } else {
799                        WARNING("data_DB:: Error executing SQL statement. rc = %i\n%s\n", \
800                                rc,data_DB->command.c_str());
801                    }
802
803                    sqlite3_finalize(pStatement);
804                    int32_t pos = data.find_last_of("@", data.length() - 2);
805                    token = data.substr(pos + 1);
806                    token.erase(token.length() - 1);
807                    data.clear();
808                    break;
809                }
810            }
811
812            bool doit = false;
813            if(miss[activeMission].services[i].output[t].find(">") != string::npos) {
814                std::string data2;
815                data_DB->command="SELECT ";
816                data_DB->command.append(data_DB->tablename);
817                data_DB->command.append(".* from ");
818                data_DB->command.append(data_DB->tablename);
819                data_DB->command.append(" where Tag=='");
820                data_DB->command.append(miss[activeMission].services[i].output[t].erase(0, 1));
821                data_DB->command.append("';");
822                sqlite3_stmt *pStatement;
823
824                rc = sqlite3_prepare_v2(data_DB->db, data_DB->command.c_str(), \
825                        -1, &pStatement, NULL);
826                if(rc == SQLITE_OK) {
827                    if(sqlite3_step(pStatement) == SQLITE_ROW)
828                         data2 = (const char *) sqlite3_column_text(pStatement, 1);
829                    else {
830                        WARNING("2 data_DB:: Data not yet in DB.\n");
831                        rc = 31337;
832                    }
833                } else {
834                    WARNING("data_DB:: Error executing SQL statement. rc = %i\n%s\n", \
835                            rc, data_DB->command.c_str());
836                }
837
838                sqlite3_finalize(pStatement);
839
840                int32_t pos = data2.find_last_of("@", data2.length() - 2);
841                token2 = data2.substr(pos + 1);
842                token2.erase(token2.length() - 1);
843                if(atof(token.c_str()) > atof(token2.c_str()))
844                    doit = true;
845            }
846            else if(miss[activeMission].services[i].output[t].find(token) != string::npos)
847                doit = true;
848
849            if(doit) {
850                for(size_t k = i + 1; k <= i+miss[activeMission].services[i].num_conds; k++) {
851                    if(miss[activeMission].services[k].name.compare("if") == 0) {
852                        input.clear();
853                        check.clear();
854                        for(size_t t = 0; t < 10; t++) {
855                            if(!miss[activeMission].services[k].output[t].empty()) {
856                                input = miss[activeMission].services[k - numstatements[1] - 1].output[t];
857                                data_DB->command="SELECT ";
858                                data_DB->command.append(data_DB->tablename);
859                                data_DB->command.append(".* from ");
860                                data_DB->command.append(data_DB->tablename);
861                                data_DB->command.append(" where Tag=='");
862                                data_DB->command.append(input);
863                                data_DB->command.append("';");
864
865                                sqlite3_stmt *pStatement;
866                                rc = sqlite3_prepare_v2(data_DB->db, data_DB->command.c_str(), \
867                                        -1, &pStatement, NULL);
868                                if(rc == SQLITE_OK) {
869                                    if(sqlite3_step(pStatement) == SQLITE_ROW)
870                                         data = (const char *) sqlite3_column_text(pStatement, 1);
871                                    else {
872                                        WARNING("3 data_DB:: Data not yet in DB.\n");
873                                        rc = 31337;
874                                    }
875                                } else {
876                                    WARNING("data_DB:: Error executing SQL statement. rc = %i\n%s\n", \
877                                            rc,data_DB->command.c_str());
878                                }
879
880                                sqlite3_finalize(pStatement);
881                                int32_t pos = data.find_last_of("@", data.length() - 2);
882                                token = data.substr(pos + 1);
883                                token.erase(token.length() - 1);
884                                break;
885                            }
886                        }
887
888                        bool doit = false;
889                        if(miss[activeMission].services[k].output[t].find(">") != string::npos) {
890                            std::string data2;
891                            data_DB->command="SELECT ";
892                            data_DB->command.append(data_DB->tablename);
893                            data_DB->command.append(".* from ");
894                            data_DB->command.append(data_DB->tablename);
895                            data_DB->command.append(" where Tag=='");
896                            data_DB->command.append(miss[activeMission].services[k].output[t].erase(0, 1));
897                            data_DB->command.append("';");
898
899                            sqlite3_stmt *pStatement;
900                            rc = sqlite3_prepare_v2(data_DB->db, data_DB->command.c_str(), \
901                                    -1, &pStatement, NULL);
902                            if(rc == SQLITE_OK) {
903                                if(sqlite3_step(pStatement) == SQLITE_ROW)
904                                     data2 = (const char *) sqlite3_column_text(pStatement, 1);
905                                else {
906                                    WARNING("4 data_DB:: Data not yet in DB.\n");
907                                    rc = 31337;
908                                }
909                            } else {
910                                WARNING("data_DB:: Error executing SQL statement. rc = %i\n%s\n", \
911                                        rc, data_DB->command.c_str());
912                            }
913
914                            sqlite3_finalize(pStatement);
915                            int32_t pos = data2.find_last_of("@", data2.length() - 2);
916                            token2 = data2.substr(pos + 1);
917                            token2.erase(token2.length() - 1);
918                            if(atof(token.c_str()) > atof(token2.c_str()))
919                                doit = true;
920                        }
921                        else if(miss[activeMission].services[k].output[t].find(token) != string::npos)
922                            doit=true;
923
924                        if(doit) {
925                            for(size_t j = k + 1; j <= k+miss[activeMission].services[k].num_conds; j++) {
926                                if(miss[activeMission].services[j].name.compare("if") == 0) {
927                                    input.clear();
928                                    check.clear();
929                                    for(t = 0; t < 10; t++) {
930                                        if(!miss[activeMission].services[j].output[t].empty()) {
931                                            input = miss[activeMission].services[j - numstatements[2] - 1].output[t];
932
933                                            data_DB->command="SELECT ";
934                                            data_DB->command.append(data_DB->tablename);
935                                            data_DB->command.append(".* from ");
936                                            data_DB->command.append(data_DB->tablename);
937                                            data_DB->command.append(" where Tag=='");
938                                            data_DB->command.append(input);
939                                            data_DB->command.append("';");
940
941                                            sqlite3_stmt *pStatement;
942
943                                            rc = sqlite3_prepare_v2(data_DB->db, data_DB->command.c_str(), -1, &pStatement, NULL);
944                                            if(rc == SQLITE_OK) {
945                                                if(sqlite3_step(pStatement) == SQLITE_ROW)
946                                                     data = (const char *) sqlite3_column_text(pStatement, 1);
947                                                else {
948                                                    WARNING("5 data_DB:: Data not yet in DB.\n");
949                                                    rc = 31337;
950                                                }
951                                            } else {
952                                                WARNING("data_DB:: Error executing SQL statement. rc = %i\n%s\n", \
953                                                        rc, data_DB->command.c_str());
954                                            }
955
956                                            sqlite3_finalize(pStatement);
957                                            int32_t pos = data.find_last_of("@", data.length()-2);
958                                            token = data.substr(pos+1);
959                                            token.erase(token.length()-1);
960                                            data.clear();
961                                            break;
962                                        }
963                                    }
964
965                                    bool doit = false;
966                                    if(miss[activeMission].services[j].output[t].find(">") != string::npos) {
967                                        data_DB->command="SELECT ";
968                                        data_DB->command.append(data_DB->tablename);
969                                        data_DB->command.append(".* from ");
970                                        data_DB->command.append(data_DB->tablename);
971                                        data_DB->command.append(" where Tag=='");
972                                        data_DB->command.append(miss[activeMission].services[j].output[t].erase(0, 1));
973                                        data_DB->command.append("';");
974
975                                        sqlite3_stmt *pStatement;
976                                        rc = sqlite3_prepare_v2(data_DB->db, data_DB->command.c_str(), -1, &pStatement, NULL);
977                                        if(rc == SQLITE_OK) {
978                                            if(sqlite3_step(pStatement) == SQLITE_ROW)
979                                                 data2 = (const char *) sqlite3_column_text(pStatement, 1);
980                                            else {
981                                                WARNING("6 data_DB:: Data not yet in DB.\n");
982                                                rc=31337;
983                                            }
984                                        } else {
985                                            WARNING("data_DB:: Error executing SQL statement. rc = %i\n%s\n", \
986                                                    rc, data_DB->command.c_str());
987                                        }
988
989                                        sqlite3_finalize(pStatement);
990
991                                        int32_t pos = data2.find_last_of("@", data2.length()-2);
992                                        token2 = data2.substr(pos + 1);
993                                        token2.erase(token2.length()-1);
994
995                                        if(atof(token.c_str()) > atof(token2.c_str()))
996                                            doit=true;
997
998                                        data.clear();
999                                    }
1000
1001                                    else if(miss[activeMission].services[j].output[t].find(token) != string::npos)
1002                                        doit = true;
1003
1004                                    if(doit) {
1005                                        for(size_t l = j + 1; l <= j + miss[activeMission].services[j].num_conds; l++) {
1006                                            TransactData(l);
1007                                        }
1008                                    }
1009
1010                                    numstatements[2] += miss[activeMission].services[j].num_conds + 1;
1011                                    j += miss[activeMission].services[j].num_conds;
1012                                } else if(miss[activeMission].services[j].name.compare("dowhile") == 0) {
1013                                    numstatements[0]=0;
1014
1015                                    while(true) {
1016                                        uint32_t l;
1017                                        for(l = j + 1; l <= j+miss[activeMission].services[j].num_conds; l++) {
1018                                            TransactData(l);
1019                                        }
1020
1021                                        data.clear();
1022                                        input.clear();
1023                                        check.clear();
1024
1025                                        int32_t t;
1026                                        for(t = 0; t < 10; t++) {
1027                                            if(!miss[activeMission].services[j].output[t].empty()){
1028                                                input=miss[activeMission].services[l-2].output[t];
1029                                                data_DB->command="SELECT ";
1030                                                data_DB->command.append(data_DB->tablename);
1031                                                data_DB->command.append(".* from ");
1032                                                data_DB->command.append(data_DB->tablename);
1033                                                data_DB->command.append(" where Tag=='");
1034                                                data_DB->command.append(input);
1035                                                data_DB->command.append("';");
1036                                                sqlite3_stmt *pStatement;
1037
1038                                                rc = sqlite3_prepare_v2(data_DB->db, data_DB->command.c_str(), -1, &pStatement, NULL);
1039                                                if(rc == SQLITE_OK){
1040                                                    if(sqlite3_step(pStatement) == SQLITE_ROW)
1041                                                         data = (const char *) sqlite3_column_text(pStatement, 1);
1042                                                    else {
1043                                                        WARNING("7 data_DB:: Data not yet in DB.: %s\n", data_DB->command.c_str());
1044                                                        rc = 31337;
1045                                                    }
1046                                                } else {
1047                                                    WARNING("data_DB:: SQL statement error. rc = %i\n%s\n", \
1048                                                            rc, data_DB->command.c_str());
1049                                                }
1050
1051                                                sqlite3_finalize(pStatement);
1052                                                int32_t pos = data.find_last_of("@", data.length() - 2);
1053                                                token = data.substr(pos + 1);
1054                                                token.erase(token.length() - 1);
1055                                                break;
1056                                            }
1057                                        }
1058
1059                                        if(miss[activeMission].services[j].output[t].find(token) == string::npos) {
1060                                            break;
1061                                        }
1062                                    }
1063
1064                                    j += miss[activeMission].services[j].num_conds;
1065                                } else {
1066                                    numstatements[2] = 0;
1067                                    TransactData(j);
1068                                }
1069                            }
1070                        }
1071
1072                        numstatements[1] += miss[activeMission].services[k].num_conds + 1;
1073                        k += miss[activeMission].services[k].num_conds;
1074                    } else if(miss[activeMission].services[k].name.compare("dowhile") == 0) {
1075                        numstatements[0] = 0;
1076                        while(true) {
1077                            int32_t j;
1078                            for(j = k + 1; j <= k + miss[activeMission].services[k].num_conds; j++) {
1079                                TransactData(j);
1080                            }
1081
1082                            data.clear();
1083                            input.clear();
1084                            check.clear();
1085                            int32_t t;
1086                            for(t = 0; t < 10; t++) {
1087                                if(!miss[activeMission].services[k].output[t].empty()) {
1088                                    input = miss[activeMission].services[j - 1].output[t];
1089                                    data_DB->command="SELECT ";
1090                                    data_DB->command.append(data_DB->tablename);
1091                                    data_DB->command.append(".* from ");
1092                                    data_DB->command.append(data_DB->tablename);
1093                                    data_DB->command.append(" where Tag=='");
1094                                    data_DB->command.append(input);
1095                                    data_DB->command.append("';");
1096
1097                                    sqlite3_stmt *pStatement;
1098                                    rc = sqlite3_prepare_v2(data_DB->db, data_DB->command.c_str(), -1, &pStatement, NULL);
1099                                    if(rc == SQLITE_OK) {
1100                                        if(sqlite3_step(pStatement) == SQLITE_ROW)
1101                                             data = (const char *) sqlite3_column_text(pStatement, 1);
1102                                        else {
1103                                            WARNING("8 data_DB:: Data not yet in DB.\n");
1104                                            rc = 31337;
1105                                        }
1106                                    } else {
1107                                        WARNING("data_DB:: Error executing SQL statement. rc = %i\n%s\n", \
1108                                                rc, data_DB->command.c_str());
1109                                    }
1110
1111                                    sqlite3_finalize(pStatement);
1112                                    int32_t pos = data.find_last_of("@", data.length() - 2);
1113                                    token = data.substr(pos + 1);
1114                                    token.erase(token.length() - 1);
1115                                    break;
1116                                }
1117                            }
1118
1119                            if(miss[activeMission].services[k].output[t].find(token) == string::npos) {
1120                                break;
1121                            }
1122                        }
1123
1124                        k += miss[activeMission].services[k].num_conds;
1125                    } else{
1126                        numstatements[1] = 0;
1127                        TransactData(k);
1128                    }
1129                }
1130            }
1131
1132            numstatements[0] += miss[activeMission].services[i].num_conds + 1;
1133            i += miss[activeMission].services[i].num_conds;
1134        } else if(miss[activeMission].services[i].name.compare("dowhile") == 0) {
1135            numstatements[0] = 0;
1136            while(true) {
1137                uint32_t k;
1138                for(k = i + 1; k <= i + miss[activeMission].services[i].num_conds; k++){
1139                    TransactData(k);
1140                }
1141
1142                data.clear();
1143                input.clear();
1144                check.clear();
1145                int32_t t;
1146                for(t = 0; t < 10; t++){
1147                    if(!miss[activeMission].services[i].output[t].empty()) {
1148                        input = miss[activeMission].services[k - 1].output[t];
1149                        data_DB->command="SELECT ";
1150                        data_DB->command.append(data_DB->tablename);
1151                        data_DB->command.append(".* from ");
1152                        data_DB->command.append(data_DB->tablename);
1153                        data_DB->command.append(" where Tag=='");
1154                        data_DB->command.append(input);
1155                        data_DB->command.append("';");
1156
1157                        sqlite3_stmt *pStatement;
1158                        rc = sqlite3_prepare_v2(data_DB->db, data_DB->command.c_str(), -1, &pStatement, NULL);
1159                        if(rc == SQLITE_OK) {
1160                            if(sqlite3_step(pStatement) == SQLITE_ROW)
1161                                 data = (const char *) sqlite3_column_text(pStatement, 1);
1162                            else {
1163                                WARNING("10data_DB:: Data not yet in DB.\n");
1164                                rc = 31337;
1165                            }
1166                        } else {
1167                            WARNING("data_DB:: Error executing SQL statement. rc = %i\n%s\n", \
1168                                    rc, data_DB->command.c_str());
1169                        }
1170
1171                        sqlite3_finalize(pStatement);
1172                        int32_t pos = data.find_last_of("@", data.length()-2);
1173                        token = data.substr(pos + 1);
1174                        token.erase(token.length() - 1);
1175                        break;
1176                    }
1177                }
1178
1179                if(miss[activeMission].services[i].output[t].find(token) == string::npos) {
1180                    break;
1181                }
1182            }
1183
1184            i += miss[activeMission].services[i].num_conds;
1185        } else{
1186            numstatements[0] = 0;
1187            TransactData(i);
1188        }
1189    }
1190
1191    int32_t i = 0;
1192    data.clear();
1193
1194    if(!shellUsed) {
1195        int k = 0;
1196        while(k < 10 && !miss[activeMission].input[k].empty()) {
1197            k++;
1198        }
1199
1200        sprintf(buffer, "%d", k);
1201        SendMessage(shellSocketFD, buffer);
1202
1203        for(size_t t = 0; t < k; t++) {
1204            SendMessage(shellSocketFD, miss[activeMission].input[t].c_str());
1205            SendMessage(shellSocketFD, "0");
1206        }
1207    }
1208
1209    LOG("ServiceManagementLayer:: Done performing active mission.\n");
1210}
1211
1212
1213/* CALLED BY: MessageHandler
1214 * INPUTS: <none>
1215 * OUTPUTS: <none>
1216 *
1217 * DESCRIPTION: Print a list of the services currently registered and the ID's of the components that registered them
1218 */
1219void
1220ServiceManagementLayer::ListServices()
1221{
1222    services_DB->command="select ";
1223    services_DB->command.append(services_DB->tablename);
1224    services_DB->command.append(".* from ");
1225    services_DB->command.append(services_DB->tablename);
1226    services_DB->command.append(";");
1227
1228    // Execute print (select all) command
1229    char *errorMsg;
1230    int32_t rc = sqlite3_exec(services_DB->db, services_DB->command.c_str(), callback, 0, &errorMsg);
1231    if((rc != SQLITE_OK) && (rc != 101))
1232        WARNING("SQL error: %s\n", errorMsg);
1233
1234    LOG("database %s, table %s:\n", services_DB->filename.c_str(), services_DB->tablename.c_str());
1235}
1236
1237
1238/* CALLED BY: Reset
1239 * INPUTS: <none>
1240 * OUTPUTS: <none>
1241 *
1242 * DESCRIPTION: Clear and reinitialize the mission array, then reload the configuration file
1243 */
1244void
1245ServiceManagementLayer::ReloadConfiguration()
1246{
1247    LOG("ServiceManagementLayer:: Reloading Configuration.\n");
1248
1249    free(miss);
1250
1251    miss = new Mission[10];
1252    for(size_t i = 0; i < 10; i++)
1253        miss[i].services = new Service[30];
1254
1255    LoadConfiguration(_SML_Config.c_str(), miss);
1256}
1257
1258
1259/* CALLED BY: constructor
1260 * INPUTS: |SML_Config| Address (either relitive or full) of the XML file containing mission data
1261 *        |mList| Mission array to be modified
1262 * OUTPUTS: <none>
1263 *
1264 * DESCRIPTION: IMPORTANT - See formatting instructions for correct parsing of data
1265 * Can currently handle 10 inputs and 10 outputs per service, but easily expandable
1266 * Also, can handle two layer of nested conditional statements, but could
1267 * be expanded to meet additional needs.
1268 *
1269 * Components assigned to mission during "set active mission" stage so that
1270 * components can still continue to register after the configuration is loaded
1271 */
1272void
1273ServiceManagementLayer::LoadConfiguration(const char *SML_Config, Mission* &mList)
1274{
1275    TiXmlElement *pMission;
1276    TiXmlElement *pService;
1277    TiXmlElement *pChild0, *pChild1, *pChild2, *pChild3, *pChild4;
1278    TiXmlHandle hRoot(0);
1279
1280    LOG("ServiceManagementLayer:: Loading Configuration.\n");
1281
1282    TiXmlDocument doc(".");
1283    doc.LoadFile(SML_Config);
1284    bool loadOkay = doc.LoadFile();
1285    if(!loadOkay)
1286        WARNING("Loading SML configuration failed: %s\n", SML_Config);
1287
1288    TiXmlHandle hDoc(&doc);
1289   
1290    pMission = hDoc.FirstChildElement().Element();
1291
1292    if(!pMission)
1293        WARNING("No valid root!");
1294
1295    hRoot = TiXmlHandle(pMission);
1296    pService = pMission->FirstChildElement();
1297
1298    int32_t mission_num = 0;
1299
1300    /* Iterate through the missions */
1301    for(pChild0 = pMission->FirstChildElement(); pChild0 ; pChild0 = pChild0->NextSiblingElement())  {
1302        int32_t service_num = 0;
1303        uint16_t cond_array[] = {0, 0, 0};
1304   
1305        for(pChild1  = (pChild0->FirstChildElement())->FirstChildElement(); pChild1; \
1306                pChild1  = pChild1->NextSiblingElement()) {
1307
1308            int32_t conditional_0 = service_num;
1309            for(pChild2 = pChild1->FirstChildElement(); pChild2; pChild2 = pChild2->NextSiblingElement()) {
1310                service_num++;
1311
1312                int32_t conditional_1 = service_num;
1313                for(pChild3 = pChild2->FirstChildElement(); pChild3; pChild3 = pChild3->NextSiblingElement()) {
1314                    service_num++;
1315                    int32_t conditional_2 = service_num;
1316                    for(pChild4 = pChild3->FirstChildElement(); pChild4; pChild4 = pChild4->NextSiblingElement()) {
1317                        service_num++;
1318                        if(pChild4->Attribute("name"))
1319                            mList[mission_num].services[service_num].name = pChild4->Attribute("name");
1320                        else
1321                            mList[mission_num].services[service_num].name = pChild4->Value();
1322
1323                        for(size_t i = 1; i <= 10; i++) {
1324                            char buffer[9]="input";
1325                            sprintf(buffer, "%s%d", buffer, i);
1326                            if(pChild4->Attribute(buffer))
1327                                mList[mission_num].services[service_num].input[i - 1] = pChild4->Attribute(buffer);
1328
1329                            char buffer2[9]="output";
1330                            sprintf(buffer2, "%s%d", buffer2, i);
1331                            if(pChild4->Attribute(buffer2))
1332                                mList[mission_num].services[service_num].output[i - 1] = pChild4->Attribute(buffer2);
1333                        }
1334
1335                        if(pChild4->Attribute("parameter"))
1336                            mList[mission_num].services[service_num].parameter = pChild4->Attribute("parameter");
1337
1338                        cond_array[2]++;
1339                    }
1340
1341                    if(!strcmp(pChild3->Value(), "shell") || conditional_2 != service_num) {
1342                        mList[mission_num].services[conditional_2].name = pChild3->Value();
1343                    } else {
1344                        mList[mission_num].services[service_num].name = pChild3->Attribute("name");
1345                    }
1346
1347                    for(size_t i = 1; i <= 10; i++) {
1348                        char buffer[9]="input";
1349                        sprintf(buffer, "%s%d", buffer, i);
1350                        if(pChild3->Attribute(buffer))
1351                            mList[mission_num].services[conditional_2].input[i - 1] = pChild3->Attribute(buffer);
1352
1353                        char buffer2[9]="output";
1354                        sprintf(buffer2, "%s%d", buffer2, i);
1355                        if(pChild3->Attribute(buffer2))
1356                            mList[mission_num].services[conditional_2].output[i - 1] = pChild3->Attribute(buffer2);
1357                    }
1358
1359                    if(pChild3->Attribute("parameter"))
1360                        mList[mission_num].services[conditional_2].parameter = pChild3->Attribute("parameter");
1361
1362                    mList[mission_num].services[conditional_2].num_conds = cond_array[2];
1363                    cond_array[1] += cond_array[2] + 1;
1364                    cond_array[2] = 0;
1365                }
1366
1367                if(!strcmp(pChild2->Value(), "shell") || (conditional_1 != service_num)) {
1368                    mList[mission_num].services[conditional_1].name = pChild2->Value();
1369                } else{
1370                    mList[mission_num].services[service_num].name = pChild2->Attribute("name");
1371                }
1372
1373                for(int i = 1; i <= 10; i++) {
1374                    char buffer[9]="input";
1375                    sprintf(buffer, "%s%d", buffer, i);
1376                    if(pChild2->Attribute(buffer))
1377                        mList[mission_num].services[conditional_1].input[i - 1] = pChild2->Attribute(buffer);
1378
1379                    char buffer2[9]="output";
1380                    sprintf(buffer2, "%s%d", buffer2, i);
1381                    if(pChild2->Attribute(buffer2))
1382                        mList[mission_num].services[conditional_1].output[i - 1] = pChild2->Attribute(buffer2);
1383                }
1384
1385                if(pChild2->Attribute("parameter"))
1386                    mList[mission_num].services[conditional_1].parameter = pChild2->Attribute("parameter");
1387
1388                mList[mission_num].services[conditional_1].num_conds = cond_array[1];
1389                cond_array[0] += cond_array[1] + 1;
1390                cond_array[1] = 0;
1391            }
1392       
1393            if(!strcmp(pChild1->Value(), "shell") || conditional_0 != service_num) {
1394                mList[mission_num].services[conditional_0].name = pChild1->Value();
1395            } else{
1396                mList[mission_num].services[conditional_0].name = pChild1->Attribute("name");
1397            }
1398
1399            for(size_t i = 1; i <= 10; i++) {
1400                char buffer[9]="input";
1401                sprintf(buffer, "%s%d", buffer, i);
1402                if(pChild1->Attribute(buffer))
1403                    mList[mission_num].services[conditional_0].input[i-1] = pChild1->Attribute(buffer);
1404
1405                char buffer2[9]="output";
1406                sprintf(buffer2, "%s%d", buffer2, i);
1407                if(pChild1->Attribute(buffer2))
1408                    mList[mission_num].services[conditional_0].output[i-1] = pChild1->Attribute(buffer2);
1409            }
1410
1411            if(pChild1->Attribute("parameter"))
1412                mList[mission_num].services[conditional_0].parameter = pChild1->Attribute("parameter");
1413            mList[mission_num].services[conditional_0].num_conds = cond_array[0];
1414                cond_array[0] = 0;
1415
1416            service_num++;
1417        }
1418   
1419        mList[mission_num].numServices = service_num;
1420        mList[mission_num].name = pChild0->Attribute("name");
1421        mList[mission_num].missionID = atoi(pChild0->Attribute("id"));
1422
1423        for(size_t i = 1; i <= 10; i++) {
1424            char buffer[9]="param";
1425            sprintf(buffer, "%s%d", buffer, i);
1426            if(pChild0->Attribute(buffer)){
1427                mList[mission_num].input[i-1] = pChild0->Attribute(buffer);
1428            }
1429        }
1430
1431        mission_num++;
1432    }
1433
1434    LOG("ServiceManagementLayer:: Done Loading Configuration\n");
1435}
1436
1437
1438/* CALLED BY: MessageHandler
1439 * INPUTS: |ID| The ID number of the engine to be registered
1440 * OUTPUTS: <none>
1441 *
1442 * DESCRIPTION: Sends a registration message onto the shell and sends the ACK back to the component
1443 */
1444void
1445ServiceManagementLayer::RegisterCognitiveEngine(int32_t ID)
1446{
1447    SendMessage(shellSocketFD, "register_engine_cognitive");
1448
1449    LOG("ServiceManagementLayer:: CE registration message forwarded to shell.\n");
1450    char buffer[256];
1451    memset(buffer, 0, 256);
1452    ReadMessage(shellSocketFD, buffer);
1453    SendMessage(CE_List[ID].FD, buffer);
1454
1455    TransferRadioConfiguration(ID);
1456    memset(buffer, 0, 256);
1457    TransferExperience(ID);
1458    memset(buffer, 0, 256);
1459    numberOfCognitiveEngines++;
1460    CE_Present = true;
1461}
1462
1463
1464/* CALLED BY: MessageHandler
1465 * INPUTS: |ID| The ID number of the engine to have it's services deregistered
1466 * OUTPUTS: <none>
1467 *
1468 * DESCRIPTION: Deletes individual services from the DB
1469 * NOTE THAT this function only needs to be called ifservice deregistration is going
1470 * to be done at a different time than component deregistration; it is handled
1471 * more efficiently and directly during that deregistration process.
1472 */
1473void
1474ServiceManagementLayer::DeregisterServices(int32_t ID)
1475{
1476    char buffer[256];
1477    memset(buffer, 0, 256);
1478    ReadMessage(CE_List[ID].FD, buffer);
1479    services_DB->command="DELETE FROM ";
1480    services_DB->command.append(services_DB->tablename);
1481    services_DB->command.append(" WHERE ID_Num IN (SELECT ");
1482
1483    char tmp[3];
1484    memset(tmp,0,3);
1485    sprintf(tmp, "%d", ID);
1486    services_DB->command.append(tmp);
1487    services_DB->command.append(" FROM ");
1488    services_DB->command.append(services_DB->tablename);
1489    services_DB->command.append(" WHERE Service_Name");
1490    services_DB->command.append("=='");
1491    services_DB->command.append(buffer);
1492    services_DB->command.append("');");
1493
1494    char *errorMsg;
1495    int32_t rc = sqlite3_exec(services_DB->db, services_DB->command.c_str(), callback, 0, &errorMsg);
1496    if((rc != SQLITE_OK) && (rc != 101))
1497        WARNING("SQL error: %s\n", errorMsg);
1498}
1499
1500
1501/* CALLED BY: MessageHandler
1502 * INPUTS: |ID| The ID number of the engine to have it's services deregistered
1503 * OUTPUTS: <none>
1504 *
1505 * DESCRIPTION: Deletes the contact info for the cognitive engine, forwards a deregistration message to the shell
1506 * Also, deletes the services from the DB
1507 */
1508void
1509ServiceManagementLayer::DeregisterCognitiveEngine(int32_t ID)
1510{
1511    LOG("ServiceManagementLayer:: CE deregistration message forwarded to shell.\n");
1512
1513    numberOfCognitiveEngines--;
1514    if(numberOfCognitiveEngines == 0)
1515        CE_Present = false;
1516
1517    SendMessage(shellSocketFD, "deregister_engine_cognitive");
1518    char buffer[256];
1519    memset(buffer, 0, 256);
1520    ReadMessage(shellSocketFD, buffer);
1521    SendMessage(CE_List[ID].FD, buffer);
1522    if(strcmp("deregister_ack", buffer) != 0) {
1523        ERROR(1, "SML:: Failed to close CE socket\n");
1524    }
1525
1526    //Deregister the services
1527    services_DB->command="DELETE FROM ";
1528    services_DB->command.append(services_DB->tablename);
1529    services_DB->command.append(" WHERE ");
1530    services_DB->command.append("ID_Num");
1531    services_DB->command.append("==");
1532
1533    char tmp[3];
1534    memset(tmp,0,3);
1535    sprintf(tmp, "%d", ID);
1536    services_DB->command.append(tmp);
1537    services_DB->command.append(";");
1538
1539    char *errorMsg;
1540    int32_t rc = sqlite3_exec(services_DB->db, services_DB->command.c_str(), callback, 0, &errorMsg);
1541    if((rc != SQLITE_OK) && (rc != 101))
1542        WARNING("SQL error: %s\n", errorMsg);
1543
1544    CE_List[ID].FD = -1;
1545    CE_List[ID].ID_num = -1;
1546
1547    LOG("Cognitive Radio Shell:: CE Socket closed for engine #%d.\n", ID);
1548}
1549
1550
1551/* CALLED BY: test class
1552 * INPUTS: <none>
1553 * OUTPUTS: <none>
1554 *
1555 * DESCRIPTION: Sets up a server socket and listens for communication on either that or the shell socket
1556 */
1557void
1558ServiceManagementLayer::StartSMLServer()
1559{
1560    struct timeval selTimeout;
1561    int32_t running = 1;
1562    int32_t port, rc, new_sd = 1;
1563    int32_t desc_ready = 1;
1564    fd_set sockSet, shellSet;
1565
1566    cogEngSrv = CreateTCPServerSocket(SMLport);
1567    int32_t maxDescriptor = cogEngSrv;
1568
1569    if(InitializeTCPServerPort(cogEngSrv) == -1)
1570        ERROR(1,"Error initializing primary port\n");
1571
1572    while (running) {
1573        /* Zero socket descriptor vector and set for server sockets */
1574        /* This must be reset every time select() is called */
1575        FD_ZERO(&sockSet);
1576        FD_SET(cogEngSrv, &sockSet);
1577       
1578        for(uint16_t k = 0; k < Current_ID; k++){
1579            if(CE_List[k].ID_num != -1)
1580                FD_SET(CE_List[k].FD, &sockSet);
1581        }
1582
1583        /* Timeout specification */
1584        /* This must be reset every time select() is called */
1585        selTimeout.tv_sec = 0;      /* timeout (secs.) */
1586        selTimeout.tv_usec = 0;     /* 0 microseconds */
1587
1588        /* Changed both to zero so that select will check messages from the shell
1589         * instead of blocking when there is no command from the CE's to be processed */
1590
1591        /* Check ifthere is a message on the socket waiting to be read */
1592        rc = select(maxDescriptor + 1, &sockSet, NULL, NULL, &selTimeout);
1593        if(rc == 0) {
1594            FD_ZERO(&shellSet);
1595            FD_SET(shellSocketFD, &shellSet);
1596            selTimeout.tv_sec = 0;
1597            selTimeout.tv_usec = 0;
1598
1599            /* Check if there is a message on the shell socket ready to be processed */
1600            select(shellSocketFD + 1, &shellSet, NULL, NULL, &selTimeout);
1601            if(FD_ISSET(shellSocketFD, &shellSet)){
1602                MessageHandler(-1);}
1603        } else {
1604            desc_ready = rc;
1605            for(port = 0; port <= maxDescriptor && desc_ready > 0; port++) {
1606                if(FD_ISSET(port, &sockSet)) {
1607                    desc_ready -= 1;
1608
1609                    /* Check ifrequest is new or on an existing open descriptor */
1610                    if(port == cogEngSrv) {
1611                        /* If new, assign it a descriptor and give it an ID */
1612                        new_sd = AcceptTCPConnection(port);
1613             
1614                        if(new_sd < 0)
1615                            break;
1616
1617                        CE_List[Current_ID].FD = new_sd;
1618                        CE_List[Current_ID].ID_num = Current_ID;
1619                        MessageHandler(Current_ID);
1620                        Current_ID++;
1621   
1622                        FD_SET(new_sd,&sockSet);
1623                        if(new_sd > maxDescriptor)
1624                           maxDescriptor = new_sd;
1625                    } else {
1626                        /* If old, figure out which ID it coresponds to and handle it accordingly */
1627                        for(size_t z = 0; z < Current_ID; z++) {
1628                            if(CE_List[z].FD == port) {
1629                                MessageHandler(z);
1630                            }
1631                        }
1632                    }
1633                }
1634            }
1635        }
1636    }       
1637
1638    /* Close sockets */
1639    close(cogEngSrv);
1640
1641    return;
1642}
1643
Note: See TracBrowser for help on using the browser.