root/vtcross/trunk/src/service_management_layer/ServiceManagementLayer.cpp @ 447

Revision 447, 66.0 KB (checked in by bhilburn, 15 years ago)

Fixed the print database command to operate like the rest of the code in
the SML source.

Line 
1/*
2 Copyright 2009 Virginia Polytechnic Institute and State University 
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7 
8 http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15*/
16
17/* Inter-component communication handled by sockets and FD's. 
18 * Server support has been completely implemented and tested.
19 *
20 * Services are stored in a SQLite DB by the ID of the CE that registered them.  Service
21 * support has been completely implemented and tested.
22 *
23 * Missions are loaded from an XML file, connected with services provided by components,
24 * and run.  See the documentation for the "PerformActiveMission" below for important
25 * info.
26 */
27
28
29#include <cmath>
30#include <cstdio>
31#include <cstdlib>
32#include <cstring>
33#include <stdint.h>
34
35#include <arpa/inet.h>
36#include <iostream>
37#include <netinet/in.h>
38#include <netdb.h>
39#include <fcntl.h>
40#include <sqlite3.h>
41#include <string>
42#include <sys/ioctl.h>
43#include <sys/mman.h>
44#include <sys/socket.h>
45#include <sys/types.h>
46#include <sys/wait.h>
47
48#include "tinyxml/tinyxml.h"
49#include "tinyxml/tinystr.h"
50
51#include "vtcross/debug.h"
52#include "vtcross/error.h"
53#include "vtcross/common.h"
54#include "vtcross/components.h"
55#include "vtcross/containers.h"
56#include "vtcross/socketcomm.h"
57
58
59typedef struct services_s *services_DB;
60typedef struct data_s *data_DB;
61
62using namespace std;
63
64struct services_s {
65    string filename;
66    string tablename;
67    string command;
68    sqlite3 *db;
69    unsigned int num_columns;
70};
71
72struct data_s {
73    string filename;
74    string tablename;
75    string command;
76    sqlite3 *db;
77    unsigned int num_columns;
78};
79
80services_DB _services_DB;
81data_DB _data_DB;
82string _SML_Config;
83bool shellFound;
84
85/* Callback function used internally by some of the SQLite3 commands */
86int32_t
87callback(void *notUsed, int32_t argc, char **argv, char **azColName)
88{
89    for(size_t i = 0; i < argc; i++) {
90        LOG("%s = %s\n", azColName[i], argv[i] ? argv[i] : "NULL");
91    }
92
93    LOG("\n");
94    return 0;
95}
96
97/* Useful for spotchecking what's in the database */
98void
99printDatabase()
100{
101    LOG("\n\n\n");
102    _data_DB->command = "select ";
103    _data_DB->command.append(_data_DB->tablename);
104    _data_DB->command.append(".* from ");
105    _data_DB->command.append(_data_DB->tablename);
106    _data_DB->command.append(";");
107
108    char *errorMsg;
109    int32_t rc = sqlite3_exec(_data_DB->db, _data_DB->command.c_str(), callback, 0, &errorMsg);
110    if((rc != SQLITE_OK) && (rc != 101))
111        WARNING("SQL error: %s\n", errorMsg);
112
113    LOG("database %s, table %s:\n", _data_DB->filename, _data_DB->tablename);
114    LOG("\n\n\n");
115}
116
117ServiceManagementLayer::ServiceManagementLayer()
118{
119    LOG("Creating Service Management Layer.\n");
120
121    shellSocketFD = -1;
122    numberOfCognitiveEngines = 0;
123    CE_Present = false;
124    cogEngSrv = 1;
125}
126
127/* Free and clear the DB's associated with this SML in the destructor.
128 *
129 * Note that exiting with an error condition will cause SML to not be destructed,
130 * resulting in the DB's staying in memory until the destructor is encountered in
131 * future executions. */
132ServiceManagementLayer::~ServiceManagementLayer()
133{
134    char *errorMsg;
135    int32_t rc;         /* sqlite command return code */
136
137    _services_DB->command = "drop table ";
138    _services_DB->command.append(_services_DB->tablename);
139    rc = sqlite3_exec(_services_DB->db, _services_DB->command.c_str(), callback, 0, &errorMsg);
140    if((rc != SQLITE_OK) && (rc != 101))
141        WARNING("ServiceManagementLayer::Destructor services 'drop table' error: %s\n", errorMsg);
142
143    _services_DB->command = "vacuum";
144    rc = sqlite3_exec(_services_DB->db, _services_DB->command.c_str(), callback, 0, &errorMsg);
145    if((rc != SQLITE_OK) && (rc != 101))
146        WARNING("ServiceManagementLayer::Destructor services 'vacuum' error: %s\n", errorMsg);
147
148    free(_services_DB);
149
150    _data_DB->command = "drop table ";
151    _data_DB->command.append(_data_DB->tablename);
152    rc = sqlite3_exec(_data_DB->db, _data_DB->command.c_str(), callback, 0, &errorMsg);
153    if((rc != SQLITE_OK) && (rc != 101))
154        WARNING("ServiceManagementLayer::Destructor data 'drop table' error: %s\n", errorMsg);
155
156    _data_DB->command = "vacuum";
157    rc = sqlite3_exec(_data_DB->db, _data_DB->command.c_str(), callback, 0, &errorMsg);
158    if((rc != SQLITE_OK) && (rc != 101))
159        WARNING("ServiceManagementLayer::Destructor data 'vacuum' error: %s\n", errorMsg);
160
161    free(_data_DB);
162}
163
164/* Note that sizes of CE_List, miss, and service are hardcoded for now.
165 * Also, their sizes are hardcoded into the code in various places; a fix
166 * for a future version. */
167ServiceManagementLayer::ServiceManagementLayer(const char* SML_Config, \
168    const char* serverName, const char* serverPort, int16_t clientPort)
169{
170    LOG("Creating Service Management Layer.\n");
171
172    _SML_Config = string(SML_Config);
173    SMLport = clientPort;
174
175    ConnectToShell(serverName, serverPort);
176    CE_List = new CE_Reg[10];
177
178    miss = new Mission[10];
179    for(size_t i = 0; i < 10; i++) {
180        miss[i].services = new Service[30];
181    }
182
183    Current_ID = 0;
184
185    LoadConfiguration(SML_Config, miss);
186
187    CreateServicesDB();
188    CreateDataDB();
189}
190
191/* CALLED BY: constructor
192 * INPUTS: <none>
193 * OUTPUTS: <none>
194 *
195 * DESCRIPTION: Create and initialize a DB to hold the services registered by components
196 */
197void
198ServiceManagementLayer::CreateServicesDB()
199{
200    sqlite3_stmt *ppStmt;   /* OUT: Statement handle */
201    const char *pzTail;     /* OUT: Pointer to unused portion of zSql */
202    int32_t rc;             /* sqlite command return code */
203
204    _services_DB = new services_s;
205    _services_DB->filename="Services_Table";
206    sqlite3_open(_services_DB->filename.c_str(), &(_services_DB->db));
207
208    char *cols[] = {(char *)"ID_Num", (char *)"Service_Name"};
209
210    _services_DB->tablename="Services";
211
212    /* ifprogram execution ends in anything other than a ordered shutdown, DB's will still
213     * be there for next run. Need to get rid of it so that old data isn't inadvertantly
214     * used in the next execution cycle. */
215    _services_DB->command = "DROP TABLE ifEXISTS Services;";     
216
217    rc = sqlite3_prepare_v2(_services_DB->db, _services_DB->command.c_str(), 128, &ppStmt, &pzTail);
218    if((rc != SQLITE_OK) && (rc != 101))
219        WARNING("ServiceManagementLayer::CreateServicesDB 'prepare_stmt' error %d\n", rc);
220
221    rc = sqlite3_step(ppStmt);
222    if((rc != SQLITE_OK) && (rc != 101))
223        WARNING("ServiceManagementLayer::CreateServicesDB 'step' error\n");
224
225    _services_DB->num_columns = 2;
226
227    /* Generate command */
228    _services_DB->command="CREATE TABLE ";
229    _services_DB->command.append(_services_DB->tablename);
230    _services_DB->command.append("(");
231    _services_DB->command.append(cols[0]);
232    _services_DB->command.append(" INT, ");
233    _services_DB->command.append(cols[1]);
234    _services_DB->command.append(" TEXT);");
235
236    /* Execute create table command */
237    rc = sqlite3_prepare_v2(_services_DB->db, _services_DB->command.c_str(), 128, &ppStmt, &pzTail);
238    if((rc != SQLITE_OK) && (rc != 101))
239        WARNING("ServiceManagementLayer::CreateServicesDB 'prepare_stmt' error %d\n", rc);
240
241    rc = sqlite3_step(ppStmt);
242    if((rc != SQLITE_OK) && (rc != 101))
243        WARNING("ServiceManagementLayer::CreateServicesDB 'step' error\n");
244}
245
246/* CALLED BY: constructor
247 * INPUTS: <none>
248 * OUTPUTS: <none>
249 *
250 * DESCRIPTION: Create and initialize a DB to hold the data sent by components
251 */
252void
253ServiceManagementLayer::CreateDataDB()
254{
255    sqlite3_stmt *ppStmt;   /* OUT: Statement handle */
256    const char *pzTail;     /* OUT: Pointer to unused portion of zSql */
257    int32_t rc;             /* sqlite command return code */
258
259    _data_DB = new data_s;
260
261    _data_DB->filename="Data_Table";
262    sqlite3_open(_data_DB->filename.c_str(), &(_data_DB->db));
263
264    char *cols[] = {(char *)"Tag", (char *)"Data"};
265
266    _data_DB->tablename = "Data";
267    _data_DB->command = "DROP TABLE ifEXISTS Data;";     
268
269    rc = sqlite3_prepare_v2(_data_DB->db, _data_DB->command.c_str(), 128, &ppStmt, &pzTail);
270    if((rc != SQLITE_OK) && (rc != 101))
271        WARNING("ServiceManagementLayer::CreateDataDB 'prepare_stmt' error %d\n", rc);
272
273    rc = sqlite3_step(ppStmt);
274    if((rc != SQLITE_OK) && (rc != 101))
275        WARNING("ServiceManagementLayer::CreateDataDB 'step' error\n");
276
277    _data_DB->num_columns = 2;
278
279    /* Generate command */
280    _data_DB->command = "CREATE TABLE ";
281    _data_DB->command.append(_data_DB->tablename);
282    _data_DB->command.append("(");
283    _data_DB->command.append(cols[0]);
284
285    /* First column is the name of the data (corresponding to the name of the output/input pair)
286     * It is the primary key so any subsequent data with the same name will replace the row. */
287    _data_DB->command.append(" TEXT PRIMARY KEY ON CONFLICT REPLACE, ");
288    _data_DB->command.append(cols[1]);
289    _data_DB->command.append(" TEXT);");
290
291    rc = sqlite3_prepare_v2(_data_DB->db, _data_DB->command.c_str(), 128, &ppStmt, &pzTail);
292    if((rc != SQLITE_OK) && (rc != 101))
293        WARNING("ServiceManagementLayer::CreateDataDB 'prepare_stmt' error %d\n", rc);
294
295    rc = sqlite3_step(ppStmt);
296    if((rc != SQLITE_OK) && (rc != 101))
297        WARNING("ServiceManagementLayer::CreateDataDB 'step' error\n");
298}
299
300/* CALLED BY: MessageHandler
301 * INPUTS: <none>
302 * OUTPUTS: <none>
303 *
304 * DESCRIPTION: Sends a message identifying this component as an SML to the Shell
305 */
306void
307ServiceManagementLayer::SendComponentType()
308{
309    SendMessage(shellSocketFD, "response_sml");
310    LOG("SML responded to GetRemoteComponentType query.\n");
311}
312
313/* CALLED BY: constructor
314 * INPUTS: |serverName| the IPv4 name of the server (127.0.0.1 for localhost)
315 *         |serverPort| the port on the server to connect to
316 * OUTPUTS: <none>
317 *
318 * DESCRIPTION: Connecting to the shell takes 2 steps
319 * 1) Establish a client socket for communication
320 * 2) Run the initial Registration/handshake routine
321 */
322void
323ServiceManagementLayer::ConnectToShell(const char* serverName, \
324        const char* serverPort)
325{
326    shellSocketFD = ClientSocket(serverName, serverPort);
327    RegisterComponent();
328}
329
330/* CALLED BY: StartSMLServer
331 * INPUTS: |ID| The ID number of the CE that has a message wating
332 * OUTPUTS: <none>
333 *
334 * DESCRIPTION: Called whenever a socket is identified as being ready for communication
335 *              This funciton reads the message and calls the appropriate helper
336 */
337void
338ServiceManagementLayer::MessageHandler(int32_t ID)
339{
340    char buffer[256];   
341    memset(buffer, 0, 256); 
342    int32_t _FD; 
343   
344    if(ID != -1)
345        _FD = CE_List[ID].FD;
346    else
347        _FD = shellSocketFD;
348
349    ReadMessage(_FD, buffer);
350   
351    //--------Policy Engine Stuff - no policy engine support in this version-------//
352
353    //printf("********* %s **********\n", buffer);
354    // TODO
355    // ifwe send integer op codes rather than strings, this process will be
356    // MUCH faster since instead of donig string compares we can simply
357    // switch on the integer value...
358    /*if(strcmp(buffer, "register_service") == 0) {
359        if(strcmp(buffer, "policy_geo") == 0) {
360        }
361        else if(strcmp(buffer, "policy_time") == 0) {
362        }
363        else if(strcmp(buffer, "policy_spectrum") == 0) {
364        }
365        else if(strcmp(buffer, "policy_spacial") == 0) {
366        }
367    }
368    else if(strcmp(buffer, "deregister_service") == 0) {
369        if(strcmp(buffer, "policy_geo") == 0) {
370        }
371        else if(strcmp(buffer, "policy_time") == 0) {
372        }
373        else if(strcmp(buffer, "policy_spectrum") == 0) {
374        }
375        else if(strcmp(buffer, "policy_spacial") == 0) {
376        }
377    }*/
378
379    //Go down the list to call the appropriate function
380    if(strcmp(buffer, "query_component_type") == 0) {
381        SendComponentType();
382    }
383    else if(strcmp(buffer, "reset_sml") == 0) {
384        Reset();
385    }
386    else if(strcmp(buffer, "shutdown_sml") == 0) {
387        Shutdown();
388    }
389    else if(strcmp(buffer, "register_engine_cognitive") == 0) {
390        RegisterCognitiveEngine(ID);
391    }
392    else if(strcmp(buffer, "register_service") == 0) {
393        ReceiveServices(ID);
394    }
395    else if(strcmp(buffer, "send_component_type") == 0) {
396        SendComponentType();
397    }
398    else if(strcmp(buffer, "list_services") == 0) {
399        ListServices();
400    }
401    else if(strcmp(buffer, "set_active_mission") == 0) {
402        SetActiveMission();
403    }
404    else if(strcmp(buffer, "request_optimization") == 0) {
405        PerformActiveMission();
406    }
407    else if(strcmp(buffer, "deregister_engine_cognitive") == 0) {
408        DeregisterCognitiveEngine(ID);
409    }
410    else if(strcmp(buffer, "deregister_service") == 0) {
411        DeregisterServices(ID);
412    }
413}
414
415//TODO Finish
416/* CALLED BY: MessageHandler
417 * INPUTS: <none>
418 * OUTPUTS: <none>
419 *
420 * DESCRIPTION: Deregisters the component from the Shell.
421 */
422void
423ServiceManagementLayer::Shutdown()
424{
425    DeregisterComponent();
426}
427
428//TODO Finish
429/* CALLED BY: MessageHandler
430 * INPUTS: <none>
431 * OUTPUTS: <none>
432 *
433 * DESCRIPTION: Deregisters the component from the Shell
434 */
435void
436ServiceManagementLayer::Reset()
437{
438    DeregisterComponent();
439    ReloadConfiguration();
440}
441
442/* CALLED BY: ConnectToShell
443 * INPUTS: <none>
444 * OUTPUTS: <none>
445 *
446 * DESCRIPTION: Sends the registration message to the Shell
447 */
448void
449ServiceManagementLayer::RegisterComponent()
450{
451    SendMessage(shellSocketFD, "register_sml");
452    LOG("ServiceManagementLayer:: Registration message sent.\n");
453}
454
455/* CALLED BY: Shutdown
456 * INPUTS: <none>
457 * OUTPUTS: <none>
458 *
459 * DESCRIPTION: Closes the client socket with the shell, sends a deregstration message
460 */
461void
462ServiceManagementLayer::DeregisterComponent()
463{
464    SendMessage(shellSocketFD, "deregister_sml");
465    LOG("ServiceManagementLayer:: Deregistration message sent.\n");
466
467    shutdown(shellSocketFD, 2);
468    close(shellSocketFD);
469    shellSocketFD = -1;
470    LOG("ServiceManagementLayer:: Shell socket closed.\n");
471}
472
473
474/* CALLED BY: RegisterCognitiveEngine
475 * INPUTS: |ID| The ID number of the component where the data is to be transfered to
476 * OUTPUTS: <none>
477 *
478 * DESCRIPTION: Streams config data directly from the shell to the CE, and checks
479 * for an "ack" message from the CE after every sent message
480 * to know when to stop communication.
481 *
482 * NOTE: Modified to check the incoming message buffer rather than the outgoing
483 * message buffer to avoid a portion of the delay. May change this again to handle
484 * data more inteligently, taking advantage of it's properties.
485 */
486void
487ServiceManagementLayer::TransferRadioConfiguration(int32_t ID)
488{
489    struct timeval selTimeout;
490    fd_set sockSet;
491    int32_t rc = 1;
492    char buffer[256];
493
494    /* Send data until the CE sends an ACK message back */
495    while(rc != 0) {
496        memset(buffer, 0, 256);
497
498        /* Receive data from Shell */
499        ReadMessage(shellSocketFD, buffer);
500
501        /* Send data to CE */
502        SendMessage(CE_List[ID].FD, buffer);
503        FD_ZERO(&sockSet);
504        FD_SET(shellSocketFD, &sockSet);
505        selTimeout.tv_sec = 0;
506        selTimeout.tv_usec = 5000;
507
508        /* Check ifthere is a message on the shell ready to be processed */
509        rc = select(shellSocketFD + 1, &sockSet, NULL, NULL, &selTimeout);
510    }
511
512    memset(buffer, 0, 256);
513    ReadMessage(CE_List[ID].FD, buffer);
514    SendMessage(shellSocketFD, buffer);
515}
516
517
518/* CALLED BY: RegisterCognitiveEngine
519 * INPUTS: |ID| The ID number of the component where the data is to be transfered to
520 * OUTPUTS: <none>
521 *
522 * DESCRIPTION: Simmilar to TransferRadioConfig, just with Experience data
523 *
524 * NOTE: Modified to check the incoming message buffer rather than the outgoing
525 * message buffer to avoid a portion of the delay. May change this again to handle
526 * data more inteligently, taking advantage of it's properties.
527 */
528void
529ServiceManagementLayer::TransferExperience(int32_t ID)
530{
531    struct timeval selTimeout;
532    fd_set sockSet;
533    int32_t rc = 1;
534    char buffer[256];
535    /* Send data until the CE sends an ACK message back */
536    while(rc != 0) {
537        memset(buffer, 0, 256);
538
539        /* Receive data from Shell */
540        ReadMessage(shellSocketFD, buffer);
541
542        /* Send data to CE */
543        SendMessage(CE_List[ID].FD, buffer);
544        FD_ZERO(&sockSet);
545        FD_SET(shellSocketFD, &sockSet);
546        selTimeout.tv_sec = 0;
547        selTimeout.tv_usec = 5000;
548
549        /* Check ifthere is a message on the shell ready to be processed */
550        rc = select(shellSocketFD + 1, &sockSet, NULL, NULL, &selTimeout);
551    }
552
553    memset(buffer, 0, 256);
554    ReadMessage(CE_List[ID].FD, buffer);
555    SendMessage(shellSocketFD, buffer);
556}
557
558/* CALLED BY: MessageHandler
559 * INPUTS: |ID| The ID number of the component where service is located
560 * OUTPUTS: <none>
561 *
562 * DESCRIPTION: Inserts a service into the DB with the ID of the component where it exists
563 */
564void
565ServiceManagementLayer::ReceiveServices(int32_t ID)
566{
567    char buffer[256];
568    memset(buffer, 0, 256);
569    ReadMessage(CE_List[ID].FD, buffer);
570
571    char *cols[] = {(char *) "ID_Num", (char *) "Service_Name"};
572
573    /* Generate command */
574    _services_DB->command = "insert into ";
575    _services_DB->command.append(_services_DB->tablename);
576    _services_DB->command.append(" (");
577    _services_DB->command.append(cols[0]);
578    _services_DB->command.append(", ");
579    _services_DB->command.append(cols[1]);
580    _services_DB->command.append(") ");
581    _services_DB->command.append(" values(");
582
583    char temp[3];
584    memset(temp, 0, 3);
585    sprintf(temp, "%d", ID);
586
587    _services_DB->command.append(temp);
588    _services_DB->command.append(", '");
589    _services_DB->command.append(buffer);
590    _services_DB->command.append("');");
591   
592    /* Execute add command */
593    char *errorMsg;
594    int rc = sqlite3_exec(_services_DB->db, _services_DB->command.c_str(), callback, 0, &errorMsg);
595    if((rc != SQLITE_OK) && (rc != 101))
596        WARNING("ServiceManagementLayer::RecieveServices DB Error %s\n", errorMsg);
597}
598
599/* CALLED BY: MessageHandler
600 * INPUTS: <none>
601 * OUTPUTS: <none>
602 *
603 * DESCRIPTION: This method associates the services that components provide with the
604 * services that are requested in the mission. Each service in the mission is given
605 * the ID and FD of a component that has registered to provide that service. Deregistration
606 * is okay until this method is called without a reload, but ifderegistration occurs after this
607 * method is called it needs to be called again even ifother engines also provide the services
608 */
609void
610ServiceManagementLayer::SetActiveMission()
611{
612    char buffer[256];
613    memset(buffer, 0, 256);
614    ReadMessage(shellSocketFD, buffer);
615
616    uint32_t missID = atoi(buffer);
617    for(activeMission = 0; activeMission < 10; activeMission++) {
618        /* Find the active mission by comparing mission ID's */
619        if(miss[activeMission].missionID == missID)
620            break;
621    }
622
623    LOG("ServiceManagementLayer:: Received Set Active Mission command: %i.\n", missID);
624
625    /* For each service in the mission */
626    for(size_t i = 0; i < miss[activeMission].numServices; i++) {
627        /* Check whether the current service is an actual service or a conditional */
628        if(miss[activeMission].services[i].name.compare("if") && \
629                miss[activeMission].services[i].name.compare("dowhile") && \
630                miss[activeMission].services[i].name.compare("shell")) {
631                /* ifit is a service, search the database of registered services to find
632                 * the ID of the component that registered it */
633                _services_DB->command="select ";
634                _services_DB->command.append(_services_DB->tablename);
635                _services_DB->command.append(".* from ");
636                _services_DB->command.append( _services_DB->tablename);
637                _services_DB->command.append(" where Service_Name=='");
638                _services_DB->command.append(miss[activeMission].services[i].name);
639                _services_DB->command.append("';");
640   
641                sqlite3_stmt * pStatement;
642                int32_t rc = sqlite3_prepare_v2(_services_DB->db, _services_DB->command.c_str(), \
643                        -1, &pStatement, NULL);
644                if(rc == SQLITE_OK) {
645                    if(sqlite3_step(pStatement) == SQLITE_ROW)
646                        miss[activeMission].services[i].componentID = sqlite3_column_int(pStatement, 0);
647                    else {
648                        WARNING("services_DB:: Mission requires service %s ", \
649                                miss[activeMission].services[i].name.c_str());
650                        WARNING("not provided by any connected component.\n");
651                        rc = 31337;
652                    }
653                } else {
654                    WARNING("services_DB:: Error executing SQL statement. rc = %i\n%s\n", \
655                            rc, _services_DB->command.c_str());
656                }
657
658                sqlite3_finalize(pStatement);
659                miss[activeMission].services[i].socketFD = \
660                    CE_List[miss[activeMission].services[i].componentID].FD;
661        }
662        /* TODO Nothing to be done for conditionals at this stage */
663    }
664 
665    SendMessage(shellSocketFD, "ack");
666    LOG("ServiceManagementLayer:: Done setting active mission.\n");
667}
668
669/* CALLED BY: PerformActiveMission
670 * INPUTS: |sourceID| ID of the service that is being processed
671 * OUTPUTS: <none>
672 *
673 * DESCRIPTION: This is a helper method for the "PerformActiveMission" function
674 * NOTE: This function has changed drastically from the previous implementation
675 *
676 * Takes an ID of a service. For that service, finds inputs in DB and forwords
677 * those on to the engine after sending comm-starting messages. Afterwords, listenes
678 * for the outputs so that it can store those in the database for future services or
679 * the overall output
680 */
681void
682ServiceManagementLayer::TransactData(int32_t sourceID)
683{
684    char buffer[256];
685    std::string data;
686    char *cols[] = {(char *) "Tag", (char *) "Data"};
687    char *token;
688
689   /* Send a message directly to the shell */
690   if(miss[activeMission].services[sourceID].name.find("shell") != string::npos) {
691       shellFound=true;
692
693       int32_t k = 0;
694       while((k < 10) && (!miss[activeMission].input[k].empty())) {
695           k++;
696       }
697
698       sprintf(buffer, "%d", k);
699       SendMessage(shellSocketFD, buffer);
700       for(int32_t t = 0; t < k; t++) {
701           memset(buffer, 0 , 256);
702           _data_DB->command="select ";
703           _data_DB->command.append(_data_DB->tablename);
704           _data_DB->command.append(".* from ");
705           _data_DB->command.append(_data_DB->tablename);
706           _data_DB->command.append(" where Tag=='");
707           _data_DB->command.append(miss[activeMission].input[t]);
708           _data_DB->command.append("';");
709           sqlite3_stmt * pStatement;
710
711           int32_t rc = sqlite3_prepare_v2(_data_DB->db, _data_DB->command.c_str(), \
712                   -1, &pStatement, NULL);
713           if(rc == SQLITE_OK) {
714               if(sqlite3_step(pStatement) == SQLITE_ROW)
715                   data=((const char*) sqlite3_column_text(pStatement, 1));
716               else {
717                   LOG("3data_DB:: Data not yet in DB., %s\n", _data_DB->command.c_str());
718                   rc = 31337;
719               }
720           }
721           else {
722               WARNING("data_DB:: Error executing SQL statement. rc = %i\n%s\n", \
723                       rc,_data_DB->command.c_str());
724           }
725
726           sqlite3_finalize(pStatement);
727           token = strtok((char *) data.c_str(), "@");
728           token = strtok(NULL, "@");
729           SendMessage(shellSocketFD, token);
730           token = strtok(NULL, "@");
731           SendMessage(shellSocketFD, token);
732       }
733
734       return;
735   }
736
737    /* ifthis is a service command and not a shell command... */
738    /* Transmission starting messages */
739    SendMessage(miss[activeMission].services[sourceID].socketFD, "request_optimization_service");
740    SendMessage(miss[activeMission].services[sourceID].socketFD, \
741            miss[activeMission].services[sourceID].name.c_str());
742}
743
744/* CALLED BY: MessageHandler
745 * INPUTS: <none>
746 * OUTPUTS: <none>
747 *
748 * DESCRIPTION: This function works by first sending the inputs from the shell to
749 * the appropriate components. The first service should begin immeadiately, as
750 * should any others who have all of their input parameters. When they complete,
751 * the output path is found and the data is transfered as it becomes available
752 * Presumably at this point the second function has all of it's parameters, so it
753 * begins to compute, and the cycle repeats.
754 *
755 * Rules for active missions (currently)
756 * -Five inputs/outputs per service and per mission
757 * -All ordering constraints have been relaxed in this version; all data is stored
758 *  locally and only sent when requested
759 * -ifand while support fully implemented - up to three levels (if's can be nested, but dowhiles cannot)
760 * -For dowhiles, assumes loop condition determined on last line
761 *
762 * -IMPORTANT: DB uses '@' to seperate individual statements; using '@' in the data
763 *  stream will result in incorrect behavior
764 */
765void
766ServiceManagementLayer::PerformActiveMission()
767{
768    shellFound = false;
769    std::string data_param;
770    std::string data_obsv;
771    std::string data;
772    std::string input;
773    std::string check;
774
775    char buffer[256];
776    char buffer1[256];
777    std::string token, token2;
778    std::string data2;
779
780    int32_t rc;
781    char *errorMsg;
782    char *cols[] = {(char *) "Tag", (char *) "Data"};
783
784    LOG("ServiceManagementLayer:: Received PerformActiveMission command.\n");
785
786    /* Get the inputs */
787    memset(buffer, 0, 256);
788    ReadMessage(shellSocketFD, buffer);
789
790    /* Receive Set of Parameters */
791    memset(buffer, 0, 256);
792    ReadMessage(shellSocketFD, buffer);
793    int32_t t = atoi(buffer);
794    for(size_t m = 0; m < t; m++) {
795        memset(buffer1, 0, 256);
796        ReadMessage(shellSocketFD, buffer1);
797        _data_DB->command="insert into ";
798        _data_DB->command.append(_data_DB->tablename);
799        _data_DB->command.append(" (");
800        _data_DB->command.append(cols[0]);
801        _data_DB->command.append(", ");
802        _data_DB->command.append(cols[1]);
803        _data_DB->command.append(") ");
804
805        memset(buffer, 0, 256);
806        ReadMessage(shellSocketFD, buffer);
807        _data_DB->command.append(" values('");
808        _data_DB->command.append(buffer1);
809        _data_DB->command.append("', '1@");
810        _data_DB->command.append(buffer1);
811        _data_DB->command.append("@");
812        _data_DB->command.append(buffer);
813        _data_DB->command.append("');");
814
815        rc = sqlite3_exec(_data_DB->db, _data_DB->command.c_str(), callback, 0, &errorMsg);
816        if((rc != SQLITE_OK) && (rc != 101))
817           WARNING("SQL error: %s\n", errorMsg);
818    }
819
820    int32_t numstatements[3] = {0 ,0 ,0};
821    for(size_t i; i < miss[activeMission].numServices; i++) {
822        if(miss[activeMission].services[i].name.compare("if") == 0) {
823            input.clear();
824            check.clear();
825
826            for(size_t t = 0; t < 10; t++) {
827                if(!miss[activeMission].services[i].output[t].empty()) {
828                    input = miss[activeMission].services[i - numstatements[0] - 1].output[t];
829
830                    _data_DB->command="SELECT ";
831                    _data_DB->command.append(_data_DB->tablename);
832                    _data_DB->command.append(".* from ");
833                    _data_DB->command.append(_data_DB->tablename);
834                    _data_DB->command.append(" where Tag=='");
835                    _data_DB->command.append(input);
836                    _data_DB->command.append("';");
837
838                    sqlite3_stmt *pStatement;
839                    rc = sqlite3_prepare_v2(_data_DB->db, _data_DB->command.c_str(), \
840                            -1, &pStatement, NULL);
841                    if(rc == SQLITE_OK) {
842                        if(sqlite3_step(pStatement) == SQLITE_ROW)
843                             data = (const char *) sqlite3_column_text(pStatement, 1);
844                        else {
845                            WARNING("1 data_DB:: Data not yet in DB.\n");
846                            rc=31337;
847                        }
848                    } else {
849                        WARNING("data_DB:: Error executing SQL statement. rc = %i\n%s\n", \
850                                rc,_data_DB->command.c_str());
851                    }
852
853                    sqlite3_finalize(pStatement);
854                    int32_t pos = data.find_last_of("@", data.length() - 2);
855                    token = data.substr(pos + 1);
856                    token.erase(token.length() - 1);
857                    data.clear();
858                    break;
859                }
860            }
861
862            bool doit = false;
863            if(miss[activeMission].services[i].output[t].find(">") != string::npos) {
864                std::string data2;
865                _data_DB->command="SELECT ";
866                _data_DB->command.append(_data_DB->tablename);
867                _data_DB->command.append(".* from ");
868                _data_DB->command.append(_data_DB->tablename);
869                _data_DB->command.append(" where Tag=='");
870                _data_DB->command.append(miss[activeMission].services[i].output[t].erase(0, 1));
871                _data_DB->command.append("';");
872                sqlite3_stmt *pStatement;
873
874                rc = sqlite3_prepare_v2(_data_DB->db, _data_DB->command.c_str(), \
875                        -1, &pStatement, NULL);
876                if(rc == SQLITE_OK) {
877                    if(sqlite3_step(pStatement) == SQLITE_ROW)
878                         data2 = (const char *) sqlite3_column_text(pStatement, 1);
879                    else {
880                        WARNING("2 data_DB:: Data not yet in DB.\n");
881                        rc = 31337;
882                    }
883                } else {
884                    WARNING("data_DB:: Error executing SQL statement. rc = %i\n%s\n", \
885                            rc, _data_DB->command.c_str());
886                }
887
888                sqlite3_finalize(pStatement);
889
890                int32_t pos = data2.find_last_of("@", data2.length() - 2);
891                token2 = data2.substr(pos + 1);
892                token2.erase(token2.length() - 1);
893                if(atof(token.c_str()) > atof(token2.c_str()))
894                    doit = true;
895            }
896            else if(miss[activeMission].services[i].output[t].find(token) != string::npos)
897                doit = true;
898
899            if(doit) {
900                for(size_t k = i + 1; k <= i+miss[activeMission].services[i].num_conds; k++) {
901                    if(miss[activeMission].services[k].name.compare("if") == 0) {
902                        input.clear();
903                        check.clear();
904                        for(size_t t = 0; t < 10; t++) {
905                            if(!miss[activeMission].services[k].output[t].empty()) {
906                                input = miss[activeMission].services[k - numstatements[1] - 1].output[t];
907                                _data_DB->command="SELECT ";
908                                _data_DB->command.append(_data_DB->tablename);
909                                _data_DB->command.append(".* from ");
910                                _data_DB->command.append(_data_DB->tablename);
911                                _data_DB->command.append(" where Tag=='");
912                                _data_DB->command.append(input);
913                                _data_DB->command.append("';");
914
915                                sqlite3_stmt *pStatement;
916                                rc = sqlite3_prepare_v2(_data_DB->db, _data_DB->command.c_str(), \
917                                        -1, &pStatement, NULL);
918                                if(rc == SQLITE_OK) {
919                                    if(sqlite3_step(pStatement) == SQLITE_ROW)
920                                         data = (const char *) sqlite3_column_text(pStatement, 1);
921                                    else {
922                                        WARNING("3 data_DB:: Data not yet in DB.\n");
923                                        rc = 31337;
924                                    }
925                                } else {
926                                    WARNING("data_DB:: Error executing SQL statement. rc = %i\n%s\n", \
927                                            rc,_data_DB->command.c_str());
928                                }
929
930                                sqlite3_finalize(pStatement);
931                                int32_t pos = data.find_last_of("@", data.length() - 2);
932                                token = data.substr(pos + 1);
933                                token.erase(token.length() - 1);
934                                break;
935                            }
936                        }
937
938                        bool doit = false;
939                        if(miss[activeMission].services[k].output[t].find(">") != string::npos) {
940                            std::string data2;
941                            _data_DB->command="SELECT ";
942                            _data_DB->command.append(_data_DB->tablename);
943                            _data_DB->command.append(".* from ");
944                            _data_DB->command.append(_data_DB->tablename);
945                            _data_DB->command.append(" where Tag=='");
946                            _data_DB->command.append(miss[activeMission].services[k].output[t].erase(0, 1));
947                            _data_DB->command.append("';");
948
949                            sqlite3_stmt *pStatement;
950                            rc = sqlite3_prepare_v2(_data_DB->db, _data_DB->command.c_str(), \
951                                    -1, &pStatement, NULL);
952                            if(rc == SQLITE_OK) {
953                                if(sqlite3_step(pStatement) == SQLITE_ROW)
954                                     data2 = (const char *) sqlite3_column_text(pStatement, 1);
955                                else {
956                                    WARNING("4 data_DB:: Data not yet in DB.\n");
957                                    rc = 31337;
958                                }
959                            } else {
960                                WARNING("data_DB:: Error executing SQL statement. rc = %i\n%s\n", \
961                                        rc, _data_DB->command.c_str());
962                            }
963
964                            sqlite3_finalize(pStatement);
965                            int32_t pos = data2.find_last_of("@", data2.length() - 2);
966                            token2 = data2.substr(pos + 1);
967                            token2.erase(token2.length() - 1);
968                            if(atof(token.c_str()) > atof(token2.c_str()))
969                                doit = true;
970                        }
971                        else if(miss[activeMission].services[k].output[t].find(token) != string::npos)
972                            doit=true;
973
974                        if(doit) {
975                            for(size_t j = k + 1; j <= k+miss[activeMission].services[k].num_conds; j++) {
976                                if(miss[activeMission].services[j].name.compare("if") == 0) {
977                                    input.clear();
978                                    check.clear();
979                                    for(t = 0; t < 10; t++) {
980                                        if(!miss[activeMission].services[j].output[t].empty()) {
981                                            input = miss[activeMission].services[j - numstatements[2] - 1].output[t];
982
983                                            _data_DB->command="SELECT ";
984                                            _data_DB->command.append(_data_DB->tablename);
985                                            _data_DB->command.append(".* from ");
986                                            _data_DB->command.append(_data_DB->tablename);
987                                            _data_DB->command.append(" where Tag=='");
988                                            _data_DB->command.append(input);
989                                            _data_DB->command.append("';");
990
991                                            sqlite3_stmt *pStatement;
992
993                                            rc = sqlite3_prepare_v2(_data_DB->db, _data_DB->command.c_str(), -1, &pStatement, NULL);
994                                            if(rc == SQLITE_OK) {
995                                                if(sqlite3_step(pStatement) == SQLITE_ROW)
996                                                     data = (const char *) sqlite3_column_text(pStatement, 1);
997                                                else {
998                                                    WARNING("5 data_DB:: Data not yet in DB.\n");
999                                                    rc = 31337;
1000                                                }
1001                                            } else {
1002                                                WARNING("data_DB:: Error executing SQL statement. rc = %i\n%s\n", \
1003                                                        rc, _data_DB->command.c_str());
1004                                            }
1005
1006                                            sqlite3_finalize(pStatement);
1007                                            int32_t pos = data.find_last_of("@", data.length()-2);
1008                                            token = data.substr(pos+1);
1009                                            token.erase(token.length()-1);
1010                                            data.clear();
1011                                            break;
1012                                        }
1013                                    }
1014
1015                                    bool doit = false;
1016                                    if(miss[activeMission].services[j].output[t].find(">") != string::npos) {
1017                                        _data_DB->command="SELECT ";
1018                                        _data_DB->command.append(_data_DB->tablename);
1019                                        _data_DB->command.append(".* from ");
1020                                        _data_DB->command.append(_data_DB->tablename);
1021                                        _data_DB->command.append(" where Tag=='");
1022                                        _data_DB->command.append(miss[activeMission].services[j].output[t].erase(0, 1));
1023                                        _data_DB->command.append("';");
1024
1025                                        sqlite3_stmt *pStatement;
1026                                        rc = sqlite3_prepare_v2(_data_DB->db, _data_DB->command.c_str(), -1, &pStatement, NULL);
1027                                        if(rc == SQLITE_OK) {
1028                                            if(sqlite3_step(pStatement) == SQLITE_ROW)
1029                                                 data2 = (const char *) sqlite3_column_text(pStatement, 1);
1030                                            else {
1031                                                WARNING("6 data_DB:: Data not yet in DB.\n");
1032                                                rc=31337;
1033                                            }
1034                                        } else {
1035                                            WARNING("data_DB:: Error executing SQL statement. rc = %i\n%s\n", \
1036                                                    rc, _data_DB->command.c_str());
1037                                        }
1038
1039                                        sqlite3_finalize(pStatement);
1040
1041                                        int32_t pos = data2.find_last_of("@", data2.length()-2);
1042                                        token2 = data2.substr(pos + 1);
1043                                        token2.erase(token2.length()-1);
1044
1045                                        if(atof(token.c_str()) > atof(token2.c_str()))
1046                                            doit=true;
1047
1048                                        data.clear();
1049                                    }
1050
1051                                    else if(miss[activeMission].services[j].output[t].find(token) != string::npos)
1052                                        doit = true;
1053
1054                                    if(doit) {
1055                                        for(size_t l = j + 1; l <= j + miss[activeMission].services[j].num_conds; l++) {
1056                                            TransactData(l);
1057                                        }
1058                                    }
1059
1060                                    numstatements[2] += miss[activeMission].services[j].num_conds + 1;
1061                                    j += miss[activeMission].services[j].num_conds;
1062                                } else if(miss[activeMission].services[j].name.compare("dowhile") == 0) {
1063                                    numstatements[0]=0;
1064
1065                                    while(true) {
1066                                        uint32_t l;
1067                                        for(l = j + 1; l <= j+miss[activeMission].services[j].num_conds; l++) {
1068                                            TransactData(l);
1069                                        }
1070
1071                                        data.clear();
1072                                        input.clear();
1073                                        check.clear();
1074
1075                                        int32_t t;
1076                                        for(t = 0; t < 10; t++) {
1077                                            if(!miss[activeMission].services[j].output[t].empty()){
1078                                                input=miss[activeMission].services[l-2].output[t];
1079                                                _data_DB->command="SELECT ";
1080                                                _data_DB->command.append(_data_DB->tablename);
1081                                                _data_DB->command.append(".* from ");
1082                                                _data_DB->command.append(_data_DB->tablename);
1083                                                _data_DB->command.append(" where Tag=='");
1084                                                _data_DB->command.append(input);
1085                                                _data_DB->command.append("';");
1086                                                sqlite3_stmt *pStatement;
1087
1088                                                rc = sqlite3_prepare_v2(_data_DB->db, _data_DB->command.c_str(), -1, &pStatement, NULL);
1089                                                if(rc == SQLITE_OK){
1090                                                    if(sqlite3_step(pStatement) == SQLITE_ROW)
1091                                                         data = (const char *) sqlite3_column_text(pStatement, 1);
1092                                                    else {
1093                                                        WARNING("7 data_DB:: Data not yet in DB.: %s\n", _data_DB->command.c_str());
1094                                                        rc = 31337;
1095                                                    }
1096                                                } else {
1097                                                    WARNING("data_DB:: SQL statement error. rc = %i\n%s\n", \
1098                                                            rc, _data_DB->command.c_str());
1099                                                }
1100
1101                                                sqlite3_finalize(pStatement);
1102                                                int32_t pos = data.find_last_of("@", data.length() - 2);
1103                                                token = data.substr(pos + 1);
1104                                                token.erase(token.length() - 1);
1105                                                break;
1106                                            }
1107                                        }
1108
1109                                        if(miss[activeMission].services[j].output[t].find(token) == string::npos) {
1110                                            break;
1111                                        }
1112                                    }
1113
1114                                    j += miss[activeMission].services[j].num_conds;
1115                                } else {
1116                                    numstatements[2] = 0;
1117                                    TransactData(j);
1118                                }
1119                            }
1120                        }
1121
1122                        numstatements[1] += miss[activeMission].services[k].num_conds + 1;
1123                        k += miss[activeMission].services[k].num_conds;
1124                    } else if(miss[activeMission].services[k].name.compare("dowhile") == 0) {
1125                        numstatements[0] = 0;
1126                        while(true) {
1127                            int32_t j;
1128                            for(j = k + 1; j <= k + miss[activeMission].services[k].num_conds; j++) {
1129                                TransactData(j);
1130                            }
1131
1132                            data.clear();
1133                            input.clear();
1134                            check.clear();
1135                            int32_t t;
1136                            for(t = 0; t < 10; t++) {
1137                                if(!miss[activeMission].services[k].output[t].empty()) {
1138                                    input = miss[activeMission].services[j - 1].output[t];
1139                                    _data_DB->command="SELECT ";
1140                                    _data_DB->command.append(_data_DB->tablename);
1141                                    _data_DB->command.append(".* from ");
1142                                    _data_DB->command.append(_data_DB->tablename);
1143                                    _data_DB->command.append(" where Tag=='");
1144                                    _data_DB->command.append(input);
1145                                    _data_DB->command.append("';");
1146
1147                                    sqlite3_stmt *pStatement;
1148                                    rc = sqlite3_prepare_v2(_data_DB->db, _data_DB->command.c_str(), -1, &pStatement, NULL);
1149                                    if(rc == SQLITE_OK) {
1150                                        if(sqlite3_step(pStatement) == SQLITE_ROW)
1151                                             data = (const char *) sqlite3_column_text(pStatement, 1);
1152                                        else {
1153                                            WARNING("8 data_DB:: Data not yet in DB.\n");
1154                                            rc = 31337;
1155                                        }
1156                                    } else {
1157                                        WARNING("data_DB:: Error executing SQL statement. rc = %i\n%s\n", \
1158                                                rc, _data_DB->command.c_str());
1159                                    }
1160
1161                                    sqlite3_finalize(pStatement);
1162                                    int32_t pos = data.find_last_of("@", data.length() - 2);
1163                                    token = data.substr(pos + 1);
1164                                    token.erase(token.length() - 1);
1165                                    break;
1166                                }
1167                            }
1168
1169                            if(miss[activeMission].services[k].output[t].find(token) == string::npos) {
1170                                break;
1171                            }
1172                        }
1173
1174                        k += miss[activeMission].services[k].num_conds;
1175                    } else{
1176                        numstatements[1] = 0;
1177                        TransactData(k);
1178                    }
1179                }
1180            }
1181
1182            numstatements[0] += miss[activeMission].services[i].num_conds + 1;
1183            i += miss[activeMission].services[i].num_conds;
1184        } else if(miss[activeMission].services[i].name.compare("dowhile") == 0) {
1185            numstatements[0] = 0;
1186            while(true) {
1187                uint32_t k;
1188                for(k = i + 1; k <= i + miss[activeMission].services[i].num_conds; k++){
1189                    TransactData(k);
1190                }
1191
1192                data.clear();
1193                input.clear();
1194                check.clear();
1195                int32_t t;
1196                for(t = 0; t < 10; t++){
1197                    if(!miss[activeMission].services[i].output[t].empty()) {
1198                        input = miss[activeMission].services[k - 1].output[t];
1199                        _data_DB->command="SELECT ";
1200                        _data_DB->command.append(_data_DB->tablename);
1201                        _data_DB->command.append(".* from ");
1202                        _data_DB->command.append(_data_DB->tablename);
1203                        _data_DB->command.append(" where Tag=='");
1204                        _data_DB->command.append(input);
1205                        _data_DB->command.append("';");
1206
1207                        sqlite3_stmt *pStatement;
1208                        rc = sqlite3_prepare_v2(_data_DB->db, _data_DB->command.c_str(), -1, &pStatement, NULL);
1209                        if(rc == SQLITE_OK) {
1210                            if(sqlite3_step(pStatement) == SQLITE_ROW)
1211                                 data = (const char *) sqlite3_column_text(pStatement, 1);
1212                            else {
1213                                WARNING("10data_DB:: Data not yet in DB.\n");
1214                                rc = 31337;
1215                            }
1216                        } else {
1217                            WARNING("data_DB:: Error executing SQL statement. rc = %i\n%s\n", \
1218                                    rc, _data_DB->command.c_str());
1219                        }
1220
1221                        sqlite3_finalize(pStatement);
1222                        int32_t pos = data.find_last_of("@", data.length()-2);
1223                        token = data.substr(pos + 1);
1224                        token.erase(token.length() - 1);
1225                        break;
1226                    }
1227                }
1228
1229                if(miss[activeMission].services[i].output[t].find(token) == string::npos) {
1230                    break;
1231                }
1232            }
1233
1234            i += miss[activeMission].services[i].num_conds;
1235        } else{
1236            numstatements[0] = 0;
1237            TransactData(i);
1238        }
1239    }
1240
1241    int32_t i = 0;
1242    data.clear();
1243
1244    if(!shellFound) {
1245        int k = 0;
1246        while(k < 10 && !miss[activeMission].input[k].empty()) {
1247            k++;
1248        }
1249
1250        sprintf(buffer, "%d", k);
1251        SendMessage(shellSocketFD, buffer);
1252
1253        for(size_t t = 0; t < k; t++) {
1254            SendMessage(shellSocketFD, miss[activeMission].input[t].c_str());
1255            SendMessage(shellSocketFD, "0");
1256        }
1257    }
1258
1259    LOG("ServiceManagementLayer:: Done performing active mission.\n");
1260}
1261
1262
1263/* CALLED BY: MessageHandler
1264 * INPUTS: <none>
1265 * OUTPUTS: <none>
1266 *
1267 * DESCRIPTION: Print a list of the services currently registered and the ID's of the components that registered them
1268 */
1269void
1270ServiceManagementLayer::ListServices()
1271{
1272    _services_DB->command="select ";
1273    _services_DB->command.append(_services_DB->tablename);
1274    _services_DB->command.append(".* from ");
1275    _services_DB->command.append(_services_DB->tablename);
1276    _services_DB->command.append(";");
1277
1278    // Execute print (select all) command
1279    char *errorMsg;
1280    int32_t rc = sqlite3_exec(_services_DB->db, _services_DB->command.c_str(), callback, 0, &errorMsg);
1281    if((rc != SQLITE_OK) && (rc != 101))
1282        WARNING("SQL error: %s\n", errorMsg);
1283
1284    LOG("database %s, table %s:\n", _services_DB->filename.c_str(), _services_DB->tablename.c_str());
1285}
1286
1287/* CALLED BY: Reset
1288 * INPUTS: <none>
1289 * OUTPUTS: <none>
1290 *
1291 * DESCRIPTION: Clear and reinitialize the mission array, then reload the configuration file
1292 */
1293void
1294ServiceManagementLayer::ReloadConfiguration()
1295{
1296    LOG("ServiceManagementLayer:: Reloading Configuration.\n");
1297
1298    free(miss);
1299
1300    miss = new Mission[10];
1301    for(size_t i = 0; i < 10; i++)
1302        miss[i].services = new Service[30];
1303
1304    LoadConfiguration(_SML_Config.c_str(), miss);
1305}
1306
1307/* CALLED BY: constructor
1308 * INPUTS: |SML_Config| Address (either relitive or full) of the XML file containing mission data
1309 *        |mList| Mission array to be modified
1310 * OUTPUTS: <none>
1311 *
1312 * DESCRIPTION: IMPORTANT - See formatting instructions for correct parsing of data
1313 * Can currently handle 10 inputs and 10 outputs per service, but easily expandable
1314 * Also, can handle two layer of nested conditional statements, but could
1315 * be expanded to meet additional needs.
1316 *
1317 * Components assigned to mission during "set active mission" stage so that
1318 * components can still continue to register after the configuration is loaded
1319 */
1320void
1321ServiceManagementLayer::LoadConfiguration(const char *SML_Config, Mission* &mList)
1322{
1323    TiXmlElement *pMission;
1324    TiXmlElement *pService;
1325    TiXmlElement *pChild0, *pChild1, *pChild2, *pChild3, *pChild4;
1326    TiXmlHandle hRoot(0);
1327
1328    LOG("ServiceManagementLayer:: Loading Configuration.\n");
1329
1330    TiXmlDocument doc(".");
1331    doc.LoadFile(SML_Config);
1332    bool loadOkay = doc.LoadFile();
1333    if(!loadOkay)
1334        WARNING("Loading SML configuration failed: %s\n", SML_Config);
1335
1336    TiXmlHandle hDoc(&doc);
1337   
1338    pMission = hDoc.FirstChildElement().Element();
1339
1340    if(!pMission)
1341        WARNING("No valid root!");
1342
1343    hRoot = TiXmlHandle(pMission);
1344    pService = pMission->FirstChildElement();
1345
1346    int32_t mission_num = 0;
1347
1348    /* Iterate through the missions */
1349    for(pChild0 = pMission->FirstChildElement(); pChild0 ; pChild0 = pChild0->NextSiblingElement())  {
1350        int32_t service_num = 0;
1351        uint16_t cond_array[] = {0, 0, 0};
1352   
1353        for(pChild1  = (pChild0->FirstChildElement())->FirstChildElement(); pChild1; \
1354                pChild1  = pChild1->NextSiblingElement()) {
1355
1356            int32_t conditional_0 = service_num;
1357            for(pChild2 = pChild1->FirstChildElement(); pChild2; pChild2 = pChild2->NextSiblingElement()) {
1358                service_num++;
1359
1360                int32_t conditional_1 = service_num;
1361                for(pChild3 = pChild2->FirstChildElement(); pChild3; pChild3 = pChild3->NextSiblingElement()) {
1362                    service_num++;
1363                    int32_t conditional_2 = service_num;
1364                    for(pChild4 = pChild3->FirstChildElement(); pChild4; pChild4 = pChild4->NextSiblingElement()) {
1365                        service_num++;
1366                        if(pChild4->Attribute("name"))
1367                            mList[mission_num].services[service_num].name = pChild4->Attribute("name");
1368                        else
1369                            mList[mission_num].services[service_num].name = pChild4->Value();
1370
1371                        for(size_t i = 1; i <= 10; i++) {
1372                            char buffer[9]="input";
1373                            sprintf(buffer, "%s%d", buffer, i);
1374                            if(pChild4->Attribute(buffer))
1375                                mList[mission_num].services[service_num].input[i - 1] = pChild4->Attribute(buffer);
1376
1377                            char buffer2[9]="output";
1378                            sprintf(buffer2, "%s%d", buffer2, i);
1379                            if(pChild4->Attribute(buffer2))
1380                                mList[mission_num].services[service_num].output[i - 1] = pChild4->Attribute(buffer2);
1381                        }
1382
1383                        if(pChild4->Attribute("parameter"))
1384                            mList[mission_num].services[service_num].parameter = pChild4->Attribute("parameter");
1385
1386                        cond_array[2]++;
1387                    }
1388
1389                    if(!strcmp(pChild3->Value(), "shell") || conditional_2 != service_num) {
1390                        mList[mission_num].services[conditional_2].name = pChild3->Value();
1391                    } else {
1392                        mList[mission_num].services[service_num].name = pChild3->Attribute("name");
1393                    }
1394
1395                    for(size_t i = 1; i <= 10; i++) {
1396                        char buffer[9]="input";
1397                        sprintf(buffer, "%s%d", buffer, i);
1398                        if(pChild3->Attribute(buffer))
1399                            mList[mission_num].services[conditional_2].input[i - 1] = pChild3->Attribute(buffer);
1400
1401                        char buffer2[9]="output";
1402                        sprintf(buffer2, "%s%d", buffer2, i);
1403                        if(pChild3->Attribute(buffer2))
1404                            mList[mission_num].services[conditional_2].output[i - 1] = pChild3->Attribute(buffer2);
1405                    }
1406
1407                    if(pChild3->Attribute("parameter"))
1408                        mList[mission_num].services[conditional_2].parameter = pChild3->Attribute("parameter");
1409
1410                    mList[mission_num].services[conditional_2].num_conds = cond_array[2];
1411                    cond_array[1] += cond_array[2] + 1;
1412                    cond_array[2] = 0;
1413                }
1414
1415                if(!strcmp(pChild2->Value(), "shell") || (conditional_1 != service_num)) {
1416                    mList[mission_num].services[conditional_1].name = pChild2->Value();
1417                } else{
1418                    mList[mission_num].services[service_num].name = pChild2->Attribute("name");
1419                }
1420
1421                for(int i = 1; i <= 10; i++) {
1422                    char buffer[9]="input";
1423                    sprintf(buffer, "%s%d", buffer, i);
1424                    if(pChild2->Attribute(buffer))
1425                        mList[mission_num].services[conditional_1].input[i - 1] = pChild2->Attribute(buffer);
1426
1427                    char buffer2[9]="output";
1428                    sprintf(buffer2, "%s%d", buffer2, i);
1429                    if(pChild2->Attribute(buffer2))
1430                        mList[mission_num].services[conditional_1].output[i - 1] = pChild2->Attribute(buffer2);
1431                }
1432
1433                if(pChild2->Attribute("parameter"))
1434                    mList[mission_num].services[conditional_1].parameter = pChild2->Attribute("parameter");
1435
1436                mList[mission_num].services[conditional_1].num_conds = cond_array[1];
1437                cond_array[0] += cond_array[1] + 1;
1438                cond_array[1] = 0;
1439            }
1440       
1441            if(!strcmp(pChild1->Value(), "shell") || conditional_0 != service_num) {
1442                mList[mission_num].services[conditional_0].name = pChild1->Value();
1443            } else{
1444                mList[mission_num].services[conditional_0].name = pChild1->Attribute("name");
1445            }
1446
1447            for(size_t i = 1; i <= 10; i++) {
1448                char buffer[9]="input";
1449                sprintf(buffer, "%s%d", buffer, i);
1450                if(pChild1->Attribute(buffer))
1451                    mList[mission_num].services[conditional_0].input[i-1] = pChild1->Attribute(buffer);
1452
1453                char buffer2[9]="output";
1454                sprintf(buffer2, "%s%d", buffer2, i);
1455                if(pChild1->Attribute(buffer2))
1456                    mList[mission_num].services[conditional_0].output[i-1] = pChild1->Attribute(buffer2);
1457            }
1458
1459            if(pChild1->Attribute("parameter"))
1460                mList[mission_num].services[conditional_0].parameter = pChild1->Attribute("parameter");
1461            mList[mission_num].services[conditional_0].num_conds = cond_array[0];
1462                cond_array[0] = 0;
1463
1464            service_num++;
1465        }
1466   
1467        mList[mission_num].numServices = service_num;
1468        mList[mission_num].name = pChild0->Attribute("name");
1469        mList[mission_num].missionID = atoi(pChild0->Attribute("id"));
1470
1471        for(size_t i = 1; i <= 10; i++) {
1472            char buffer[9]="param";
1473            sprintf(buffer, "%s%d", buffer, i);
1474            if(pChild0->Attribute(buffer)){
1475                mList[mission_num].input[i-1] = pChild0->Attribute(buffer);
1476            }
1477        }
1478
1479        mission_num++;
1480    }
1481
1482    LOG("ServiceManagementLayer:: Done Loading Configuration\n");
1483}
1484
1485/* CALLED BY: MessageHandler
1486 * INPUTS: |ID| The ID number of the engine to be registered
1487 * OUTPUTS: <none>
1488 *
1489 * DESCRIPTION: Sends a registration message onto the shell and sends the ACK back to the component
1490 */
1491void
1492ServiceManagementLayer::RegisterCognitiveEngine(int32_t ID)
1493{
1494    SendMessage(shellSocketFD, "register_engine_cognitive");
1495
1496    LOG("ServiceManagementLayer:: CE registration message forwarded to shell.\n");
1497    char buffer[256];
1498    memset(buffer, 0, 256);
1499    ReadMessage(shellSocketFD, buffer);
1500    SendMessage(CE_List[ID].FD, buffer);
1501
1502    TransferRadioConfiguration(ID);
1503    memset(buffer, 0, 256);
1504    TransferExperience(ID);
1505    memset(buffer, 0, 256);
1506    numberOfCognitiveEngines++;
1507    CE_Present = true;
1508}
1509
1510/* CALLED BY: MessageHandler
1511 * INPUTS: |ID| The ID number of the engine to have it's services deregistered
1512 * OUTPUTS: <none>
1513 *
1514 * DESCRIPTION: Deletes individual services from the DB
1515 * NOTE THAT this function only needs to be called ifservice deregistration is going
1516 * to be done at a different time than component deregistration; it is handled
1517 * more efficiently and directly during that deregistration process.
1518 */
1519void
1520ServiceManagementLayer::DeregisterServices(int32_t ID)
1521{
1522    char buffer[256];
1523    memset(buffer, 0, 256);
1524    ReadMessage(CE_List[ID].FD, buffer);
1525    _services_DB->command="DELETE FROM ";
1526    _services_DB->command.append(_services_DB->tablename);
1527    _services_DB->command.append(" WHERE ID_Num IN (SELECT ");
1528
1529    char tmp[3];
1530    memset(tmp,0,3);
1531    sprintf(tmp, "%d", ID);
1532    _services_DB->command.append(tmp);
1533    _services_DB->command.append(" FROM ");
1534    _services_DB->command.append(_services_DB->tablename);
1535    _services_DB->command.append(" WHERE Service_Name");
1536    _services_DB->command.append("=='");
1537    _services_DB->command.append(buffer);
1538    _services_DB->command.append("');");
1539
1540    char *errorMsg;
1541    int32_t rc = sqlite3_exec(_services_DB->db, _services_DB->command.c_str(), callback, 0, &errorMsg);
1542    if((rc != SQLITE_OK) && (rc != 101))
1543        WARNING("SQL error: %s\n", errorMsg);
1544}
1545
1546/* CALLED BY: MessageHandler
1547 * INPUTS: |ID| The ID number of the engine to have it's services deregistered
1548 * OUTPUTS: <none>
1549 *
1550 * DESCRIPTION: Deletes the contact info for the cognitive engine, forwards a deregistration message to the shell
1551 * Also, deletes the services from the DB
1552 */
1553void
1554ServiceManagementLayer::DeregisterCognitiveEngine(int32_t ID)
1555{
1556    LOG("ServiceManagementLayer:: CE deregistration message forwarded to shell.\n");
1557
1558    numberOfCognitiveEngines--;
1559    if(numberOfCognitiveEngines == 0)
1560        CE_Present = false;
1561
1562    SendMessage(shellSocketFD, "deregister_engine_cognitive");
1563    char buffer[256];
1564    memset(buffer, 0, 256);
1565    ReadMessage(shellSocketFD, buffer);
1566    SendMessage(CE_List[ID].FD, buffer);
1567    if(strcmp("deregister_ack", buffer) != 0) {
1568        ERROR(1, "SML:: Failed to close CE socket\n");
1569    }
1570
1571    //Deregister the services
1572    _services_DB->command="DELETE FROM ";
1573    _services_DB->command.append(_services_DB->tablename);
1574    _services_DB->command.append(" WHERE ");
1575    _services_DB->command.append("ID_Num");
1576    _services_DB->command.append("==");
1577
1578    char tmp[3];
1579    memset(tmp,0,3);
1580    sprintf(tmp, "%d", ID);
1581    _services_DB->command.append(tmp);
1582    _services_DB->command.append(";");
1583
1584    char *errorMsg;
1585    int32_t rc = sqlite3_exec(_services_DB->db, _services_DB->command.c_str(), callback, 0, &errorMsg);
1586    if((rc != SQLITE_OK) && (rc != 101))
1587        WARNING("SQL error: %s\n", errorMsg);
1588
1589    CE_List[ID].FD = -1;
1590    CE_List[ID].ID_num = -1;
1591
1592    LOG("Cognitive Radio Shell:: CE Socket closed for engine #%d.\n", ID);
1593}
1594
1595
1596/* CALLED BY: test class
1597 * INPUTS: <none>
1598 * OUTPUTS: <none>
1599 *
1600 * DESCRIPTION: Sets up a server socket and listens for communication on either that or the shell socket
1601 */
1602void
1603ServiceManagementLayer::StartSMLServer()
1604{
1605    struct timeval selTimeout;
1606    int32_t running = 1;
1607    int32_t port, rc, new_sd = 1;
1608    int32_t desc_ready = 1;
1609    fd_set sockSet, shellSet;
1610
1611    cogEngSrv = CreateTCPServerSocket(SMLport);
1612    int32_t maxDescriptor = cogEngSrv;
1613
1614    if(InitializeTCPServerPort(cogEngSrv) == -1)
1615        ERROR(1,"Error initializing primary port\n");
1616
1617    while (running) {
1618        /* Zero socket descriptor vector and set for server sockets */
1619        /* This must be reset every time select() is called */
1620        FD_ZERO(&sockSet);
1621        FD_SET(cogEngSrv, &sockSet);
1622       
1623        for(uint16_t k = 0; k < Current_ID; k++){
1624            if(CE_List[k].ID_num != -1)
1625                FD_SET(CE_List[k].FD, &sockSet);
1626        }
1627
1628        /* Timeout specification */
1629        /* This must be reset every time select() is called */
1630        selTimeout.tv_sec = 0;      /* timeout (secs.) */
1631        selTimeout.tv_usec = 0;     /* 0 microseconds */
1632
1633        /* Changed both to zero so that select will check messages from the shell
1634         * instead of blocking when there is no command from the CE's to be processed */
1635
1636        /* Check ifthere is a message on the socket waiting to be read */
1637        rc = select(maxDescriptor + 1, &sockSet, NULL, NULL, &selTimeout);
1638        if(rc == 0) {
1639            FD_ZERO(&shellSet);
1640            FD_SET(shellSocketFD, &shellSet);
1641            selTimeout.tv_sec = 0;
1642            selTimeout.tv_usec = 0;
1643
1644            /* Check if there is a message on the shell socket ready to be processed */
1645            select(shellSocketFD + 1, &shellSet, NULL, NULL, &selTimeout);
1646            if(FD_ISSET(shellSocketFD, &shellSet)){
1647                MessageHandler(-1);}
1648        } else {
1649            desc_ready = rc;
1650            for(port = 0; port <= maxDescriptor && desc_ready > 0; port++) {
1651                if(FD_ISSET(port, &sockSet)) {
1652                    desc_ready -= 1;
1653
1654                    /* Check ifrequest is new or on an existing open descriptor */
1655                    if(port == cogEngSrv) {
1656                        /* If new, assign it a descriptor and give it an ID */
1657                        new_sd = AcceptTCPConnection(port);
1658             
1659                        if(new_sd < 0)
1660                            break;
1661
1662                        CE_List[Current_ID].FD = new_sd;
1663                        CE_List[Current_ID].ID_num = Current_ID;
1664                        MessageHandler(Current_ID);
1665                        Current_ID++;
1666   
1667                        FD_SET(new_sd,&sockSet);
1668                        if(new_sd > maxDescriptor)
1669                           maxDescriptor = new_sd;
1670                    } else {
1671                        /* If old, figure out which ID it coresponds to and handle it accordingly */
1672                        for(size_t z = 0; z < Current_ID; z++) {
1673                            if(CE_List[z].FD == port) {
1674                                MessageHandler(z);
1675                            }
1676                        }
1677                    }
1678                }
1679            }
1680        }
1681    }       
1682
1683    /* Close sockets */
1684    close(cogEngSrv);
1685
1686    return;
1687}
1688
Note: See TracBrowser for help on using the browser.