00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031 #include "module.h"
00032 #include <pthread.h>
00033
00034 namespace module_webservice
00035 {
00036
00037
00038 PyObject* CommandWebservice::pythonService(PyObject* self, PyObject* args)
00039 {
00040 Py_BEGIN_ALLOW_THREADS
00041 try {
00042 CommandWebservice().execute();
00043 }
00044 catch (...)
00045 {
00046 Py_BLOCK_THREADS;
00047 PythonType::evalException();
00048 return NULL;
00049 }
00050 Py_END_ALLOW_THREADS
00051 return Py_BuildValue("");
00052 }
00053
00054
00055 void CommandWebservice::execute()
00056 {
00057
00058 thread_data threadinfo[threads];
00059 struct soap soap;
00060 soap_init(&soap);
00061 SOAP_SOCKET mastersocket, slavesocket;
00062
00063
00064 mastersocket = soap_bind(&soap, NULL, port, BACKLOG);
00065 if (!soap_valid_socket(mastersocket))
00066 throw RuntimeException("Can't bind to port " + port);
00067
00068
00069 pthread_mutex_init(&queue_cs, NULL);
00070 pthread_cond_init(&queue_cv, NULL);
00071 for (int i = 0; i < threads; i++)
00072 {
00073 threadinfo[i].master = this;
00074 threadinfo[i].index = i;
00075 threadinfo[i].soap_thr = soap_copy(&soap);
00076 pthread_create(&threadinfo[i].tid, NULL, (void*(*)(void*))process_queue, static_cast<void*>(&threadinfo[i]));
00077 }
00078
00079
00080 for (;;)
00081 {
00082
00083 slavesocket = soap_accept(&soap);
00084 if (!soap_valid_socket(slavesocket))
00085 {
00086 if (soap.errnum)
00087 {
00088 soap_print_fault(&soap, stderr);
00089 continue;
00090 }
00091 else
00092 {
00093 logger << "Server timed out" << endl;
00094 break;
00095 }
00096 }
00097 logger << "Connection from " << ((soap.ip >> 24)&0xFF) << "."
00098 << ((soap.ip >> 16)&0xFF) << "." << ((soap.ip >> 8)&0xFF) << "."
00099 << (soap.ip&0xFF) << endl;
00100
00101
00102 while (enqueue(slavesocket) == SOAP_EOM)
00103 sleep(1);
00104 }
00105
00106
00107 for (int i = 0; i < threads; i++)
00108 {
00109
00110 while (enqueue(SOAP_INVALID_SOCKET) == SOAP_EOM)
00111 sleep(1);
00112 }
00113
00114
00115 for (int i = 0; i < threads; i++)
00116 {
00117 pthread_join(threadinfo[i].tid, NULL);
00118 soap_done(threadinfo[i].soap_thr);
00119 free(threadinfo[i].soap_thr);
00120 }
00121
00122
00123 pthread_mutex_destroy(&queue_cs);
00124 pthread_cond_destroy(&queue_cv);
00125 soap_done(&soap);
00126 }
00127
00128
00129 void* CommandWebservice::process_queue(void *soap)
00130 {
00131 struct thread_data *mydata = (struct thread_data*)soap;
00132
00133 for (;;)
00134 {
00135
00136 mydata->soap_thr->socket = mydata->master->dequeue();
00137
00138
00139 if (!soap_valid_socket(mydata->soap_thr->socket)) break;
00140
00141
00142 soap_serve(mydata->soap_thr);
00143 soap_destroy(mydata->soap_thr);
00144 soap_end(mydata->soap_thr);
00145 }
00146 return NULL;
00147 }
00148
00149
00150 int CommandWebservice::enqueue(SOAP_SOCKET sock)
00151 {
00152 int status = SOAP_OK;
00153 int next;
00154 pthread_mutex_lock(&queue_cs);
00155 next = tail + 1;
00156 if (next >= MAX_QUEUE)
00157 next = 0;
00158 if (next == head)
00159 status = SOAP_EOM;
00160 else
00161 {
00162 queue[tail] = sock;
00163 tail = next;
00164 }
00165 pthread_cond_signal(&queue_cv);
00166 pthread_mutex_unlock(&queue_cs);
00167 return status;
00168 }
00169
00170
00171 SOAP_SOCKET CommandWebservice::dequeue()
00172 {
00173 SOAP_SOCKET sock;
00174 pthread_mutex_lock(&queue_cs);
00175 while (head == tail) pthread_cond_wait(&queue_cv, &queue_cs);
00176 sock = queue[head++];
00177 if (head >= MAX_QUEUE)
00178 head = 0;
00179 pthread_mutex_unlock(&queue_cs);
00180 return sock;
00181 }
00182
00183 }