OpenSync
0.22
|
00001 /* 00002 * libosengine - A synchronization engine for the opensync framework 00003 * Copyright (C) 2004-2005 Armin Bauer <armin.bauer@opensync.org> 00004 * 00005 * This library is free software; you can redistribute it and/or 00006 * modify it under the terms of the GNU Lesser General Public 00007 * License as published by the Free Software Foundation; either 00008 * version 2.1 of the License, or (at your option) any later version. 00009 * 00010 * This library is distributed in the hope that it will be useful, 00011 * but WITHOUT ANY WARRANTY; without even the implied warranty of 00012 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 00013 * Lesser General Public License for more details. 00014 * 00015 * You should have received a copy of the GNU Lesser General Public 00016 * License along with this library; if not, write to the Free Software 00017 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 00018 * 00019 */ 00020 00021 #include "config.h" 00022 #include "engine.h" 00023 #include <glib.h> 00024 #include <opensync/opensync_support.h> 00025 #include "opensync/opensync_format_internals.h" 00026 #include "opensync/opensync_member_internals.h" 00027 #include "opensync/opensync_message_internals.h" 00028 #include "opensync/opensync_queue_internals.h" 00029 00030 #include "engine_internals.h" 00031 #include <unistd.h> 00032 00033 #include <sys/types.h> 00034 #include <sys/wait.h> 00035 #include <errno.h> 00036 #include <signal.h> 00037 00043 void _get_changes_reply_receiver(OSyncMessage *message, OSyncClient *sender) 00044 { 00045 osync_trace(TRACE_ENTRY, "%s(%p, %p)", __func__, message, sender); 00046 OSyncEngine *engine = sender->engine; 00047 00048 if (osync_message_is_error(message)) { 00049 OSyncError *error = NULL; 00050 osync_demarshal_error(message, &error); 00051 osync_error_duplicate(&engine->error, &error); 00052 osync_debug("ENG", 1, "Get changes command reply was a error: %s", osync_error_print(&error)); 00053 osync_status_update_member(engine, sender, MEMBER_GET_CHANGES_ERROR, &error); 00054 osync_error_update(&engine->error, "Unable to read from one of the members"); 00055 osync_flag_unset(sender->fl_sent_changes); 00056 //osync_flag_set(sender->fl_finished); 00057 osync_flag_set(sender->fl_done); 00058 /* 00059 * FIXME: For now we want to stop the engine if 00060 * one of the member didnt connect yet. Later it should 00061 * be that if >= 2 members connect, the sync should continue 00062 */ 00063 osync_flag_set(engine->fl_stop); 00064 00065 } else { 00066 osync_status_update_member(engine, sender, MEMBER_SENT_CHANGES, NULL); 00067 osync_flag_set(sender->fl_sent_changes); 00068 } 00069 00070 osengine_client_decider(engine, sender); 00071 osync_trace(TRACE_EXIT, "_get_changes_reply_receiver"); 00072 } 00073 00079 void _connect_reply_receiver(OSyncMessage *message, OSyncClient *sender) 00080 { 00081 osync_trace(TRACE_ENTRY, "_connect_reply_receiver(%p, %p)", message, sender); 00082 00083 osync_trace(TRACE_INTERNAL, "connect reply %i", osync_message_is_error(message)); 00084 OSyncEngine *engine = sender->engine; 00085 00086 if (osync_message_is_error(message)) { 00087 OSyncError *error = NULL; 00088 osync_demarshal_error(message, &error); 00089 osync_error_duplicate(&engine->error, &error); 00090 osync_debug("ENG", 1, "Connect command reply was a error: %s", osync_error_print(&error)); 00091 osync_status_update_member(engine, sender, MEMBER_CONNECT_ERROR, &error); 00092 osync_error_update(&engine->error, "Unable to connect one of the members"); 00093 osync_flag_unset(sender->fl_connected); 00094 osync_flag_set(sender->fl_finished); 00095 osync_flag_set(sender->fl_sent_changes); 00096 osync_flag_set(sender->fl_done); 00097 /* 00098 * FIXME: For now we want to stop the engine if 00099 * one of the member didnt connect yet. Later it should 00100 * be that if >= 2 members connect, the sync should continue 00101 */ 00102 osync_flag_set(engine->fl_stop); 00103 00104 } else { 00105 osync_member_read_sink_info(sender->member, message); 00106 00107 osync_status_update_member(engine, sender, MEMBER_CONNECTED, NULL); 00108 osync_flag_set(sender->fl_connected); 00109 } 00110 00111 osengine_client_decider(engine, sender); 00112 osync_trace(TRACE_EXIT, "_connect_reply_receiver"); 00113 } 00114 00115 void _sync_done_reply_receiver(OSyncMessage *message, OSyncClient *sender) 00116 { 00117 osync_trace(TRACE_ENTRY, "_sync_done_reply_receiver(%p, %p)", message, sender); 00118 00119 OSyncEngine *engine = sender->engine; 00120 00121 if (osync_message_is_error(message)) { 00122 OSyncError *error = NULL; 00123 osync_demarshal_error(message, &error); 00124 osync_error_duplicate(&engine->error, &error); 00125 osync_debug("ENG", 1, "Sync done command reply was a error: %s", osync_error_print(&error)); 00126 osync_status_update_member(engine, sender, MEMBER_SYNC_DONE_ERROR, &error); 00127 osync_error_update(&engine->error, "Unable to finish the sync for one of the members"); 00128 } 00129 00130 osync_flag_set(sender->fl_done); 00131 osengine_client_decider(engine, sender); 00132 osync_trace(TRACE_EXIT, "_sync_done_reply_receiver"); 00133 } 00134 00135 void _committed_all_reply_receiver(OSyncMessage *message, OSyncClient *sender) 00136 { 00137 osync_trace(TRACE_ENTRY, "%s(%p, %p)", __func__, message, sender); 00138 00139 OSyncEngine *engine = sender->engine; 00140 00141 if (osync_message_is_error(message)) { 00142 OSyncError *error = NULL; 00143 osync_demarshal_error(message, &error); 00144 osync_error_duplicate(&engine->error, &error); 00145 osync_debug("ENG", 1, "Committed all command reply was a error: %s", osync_error_print(&error)); 00146 osync_status_update_member(engine, sender, MEMBER_COMMITTED_ALL_ERROR, &error); 00147 osync_error_update(&engine->error, "Unable to write changes to one of the members"); 00148 } else 00149 osync_status_update_member(engine, sender, MEMBER_COMMITTED_ALL, NULL); 00150 00151 osync_flag_set(sender->fl_committed_all); 00152 osengine_client_decider(engine, sender); 00153 osync_trace(TRACE_EXIT, "%s", __func__); 00154 } 00155 00156 void _disconnect_reply_receiver(OSyncMessage *message, OSyncClient *sender) 00157 { 00158 osync_trace(TRACE_ENTRY, "_disconnect_reply_receiver(%p, %p)", message, sender); 00159 00160 OSyncEngine *engine = sender->engine; 00161 00162 if (osync_message_is_error(message)) { 00163 OSyncError *error = NULL; 00164 osync_demarshal_error(message, &error); 00165 osync_debug("ENG", 1, "Sync done command reply was a error: %s", osync_error_print(&error)); 00166 osync_status_update_member(engine, sender, MEMBER_DISCONNECT_ERROR, &error); 00167 } else 00168 osync_status_update_member(engine, sender, MEMBER_DISCONNECTED, NULL); 00169 00170 osync_flag_unset(sender->fl_connected); 00171 osync_flag_set(sender->fl_finished); 00172 osengine_client_decider(engine, sender); 00173 osync_trace(TRACE_EXIT, "_disconnect_reply_receiver"); 00174 } 00175 00176 void _get_change_data_reply_receiver(OSyncMessage *message, OSyncMappingEntry *entry) 00177 { 00178 osync_trace(TRACE_ENTRY, "_get_change_data_reply_receiver(%p, %p, %p)", message, entry); 00179 OSyncEngine *engine = entry->client->engine; 00180 00181 if (osync_message_is_error(message)) { 00182 OSyncError *error = NULL; 00183 osync_demarshal_error(message, &error); 00184 osync_error_duplicate(&engine->error, &error); 00185 osync_debug("MAP", 1, "Commit change command reply was a error: %s", osync_error_print(&error)); 00186 osync_status_update_change(engine, entry->change, CHANGE_RECV_ERROR, &error); 00187 osync_error_update(&engine->error, "Unable to read one or more objects"); 00188 00189 //FIXME Do we need to do anything here? 00190 //osync_flag_unset(entry->fl_has_data); 00191 } else { 00192 00193 osync_demarshal_changedata(message, entry->change); 00194 00195 osync_flag_set(entry->fl_has_data); 00196 osync_status_update_change(engine, entry->change, CHANGE_RECEIVED, NULL); 00197 } 00198 00199 osync_change_save(entry->change, TRUE, NULL); 00200 osengine_mappingentry_decider(engine, entry); 00201 osync_trace(TRACE_EXIT, "_get_change_data_reply_receiver"); 00202 } 00203 00204 void _read_change_reply_receiver(OSyncClient *sender, OSyncMessage *message, OSyncEngine *engine) 00205 { 00206 osync_trace(TRACE_ENTRY, "_read_change_reply_receiver(%p, %p, %p)", sender, message, engine); 00207 00208 /*OSyncMappingEntry *entry = osync_message_get_data(message, "entry"); 00209 00210 osync_flag_detach(entry->fl_read); 00211 00212 osync_flag_unset(entry->mapping->fl_solved); 00213 osync_flag_unset(entry->mapping->fl_chkconflict); 00214 osync_flag_unset(entry->mapping->fl_multiplied); 00215 00216 if (osync_change_get_changetype(entry->change) == CHANGE_DELETED) 00217 osync_flag_set(entry->fl_deleted); 00218 00219 osync_flag_set(entry->fl_has_info); 00220 osync_flag_unset(entry->fl_synced); 00221 00222 osync_change_save(entry->change, TRUE, NULL); 00223 00224 osync_status_update_change(engine, entry->change, CHANGE_RECEIVED, NULL); 00225 00226 osengine_mappingentry_decider(engine, entry);*/ 00227 osync_trace(TRACE_EXIT, "_read_change_reply_receiver"); 00228 } 00229 00230 void _commit_change_reply_receiver(OSyncMessage *message, OSyncMappingEntry *entry) 00231 { 00232 osync_trace(TRACE_ENTRY, "_commit_change_reply_receiver(%p, %p)", message, entry); 00233 OSyncEngine *engine = entry->client->engine; 00234 00235 if (osync_message_is_error(message)) { 00236 OSyncError *error = NULL; 00237 osync_demarshal_error(message, &error); 00238 osync_error_duplicate(&engine->error, &error); 00239 osync_debug("MAP", 1, "Commit change command reply was a error: %s", osync_error_print(&error)); 00240 osync_status_update_change(engine, entry->change, CHANGE_WRITE_ERROR, &error); 00241 OSyncError *maperror = NULL; 00242 osync_error_duplicate(&maperror, &error); 00243 osync_status_update_mapping(engine, entry->mapping, MAPPING_WRITE_ERROR, &maperror); 00244 osync_error_update(&engine->error, "Unable to write one or more objects"); 00245 00246 //FIXME Do we need to do anything here? 00247 osync_flag_unset(entry->fl_dirty); 00248 osync_flag_set(entry->fl_synced); 00249 } else { 00250 /* The plugin may have generated a new UID after committing the change. The commit 00251 * change reply will return the new UID of the change 00252 */ 00253 00254 char *newuid; 00255 osync_message_read_string(message, &newuid); 00256 osync_change_set_uid(entry->change, newuid); 00257 00258 osync_status_update_change(engine, entry->change, CHANGE_SENT, NULL); 00259 osync_flag_unset(entry->fl_dirty); 00260 osync_flag_set(entry->fl_synced); 00261 } 00262 00263 if (osync_change_get_changetype(entry->change) == CHANGE_DELETED) 00264 osync_flag_set(entry->fl_deleted); 00265 00266 osync_change_reset(entry->change); 00267 00268 OSyncError *error = NULL; 00269 osync_change_save(entry->change, TRUE, &error); 00270 00271 osengine_mappingentry_decider(engine, entry); 00272 osync_trace(TRACE_EXIT, "_commit_change_reply_receiver"); 00273 } 00274 00275 OSyncClient *osync_client_new(OSyncEngine *engine, OSyncMember *member, OSyncError **error) 00276 { 00277 osync_trace(TRACE_ENTRY, "%s(%p, %p, %p)", __func__, engine, member, error); 00278 OSyncClient *client = osync_try_malloc0(sizeof(OSyncClient), error); 00279 if (!client) 00280 goto error; 00281 00282 client->member = member; 00283 osync_member_set_data(member, client); 00284 client->engine = engine; 00285 engine->clients = g_list_append(engine->clients, client); 00286 00287 char *name = g_strdup_printf("%s/pluginpipe", osync_member_get_configdir(member)); 00288 client->commands_to_osplugin = osync_queue_new(name, error); 00289 g_free(name); 00290 00291 name = g_strdup_printf("%s/enginepipe", osync_member_get_configdir(member)); 00292 client->commands_from_osplugin = osync_queue_new(name, error); 00293 g_free(name); 00294 00295 if (!client->commands_to_osplugin || !client->commands_from_osplugin) 00296 goto error_free_client; 00297 00298 client->fl_connected = osync_flag_new(engine->cmb_connected); 00299 client->fl_sent_changes = osync_flag_new(engine->cmb_sent_changes); 00300 client->fl_done = osync_flag_new(NULL); 00301 client->fl_committed_all = osync_flag_new(engine->cmb_committed_all_sent); 00302 client->fl_finished = osync_flag_new(engine->cmb_finished); 00303 00304 osync_trace(TRACE_EXIT, "%s: %p", __func__, client); 00305 return client; 00306 00307 error_free_client: 00308 g_free(client); 00309 error: 00310 osync_trace(TRACE_EXIT_ERROR, "%s: %s", __func__, osync_error_print(error)); 00311 return NULL; 00312 } 00313 00314 void osync_client_reset(OSyncClient *client) 00315 { 00316 osync_trace(TRACE_ENTRY, "%s(%p)", __func__, client); 00317 osync_flag_set_state(client->fl_connected, FALSE); 00318 osync_flag_set_state(client->fl_sent_changes, FALSE); 00319 osync_flag_set_state(client->fl_done, FALSE); 00320 osync_flag_set_state(client->fl_finished, FALSE); 00321 osync_flag_set_state(client->fl_committed_all, FALSE); 00322 osync_trace(TRACE_EXIT, "%s", __func__); 00323 } 00324 00325 void osync_client_free(OSyncClient *client) 00326 { 00327 osync_trace(TRACE_ENTRY, "%s(%p)", __func__, client); 00328 osync_queue_free(client->commands_to_osplugin); 00329 osync_queue_free(client->commands_from_osplugin); 00330 00331 osync_flag_free(client->fl_connected); 00332 osync_flag_free(client->fl_sent_changes); 00333 osync_flag_free(client->fl_done); 00334 osync_flag_free(client->fl_finished); 00335 osync_flag_free(client->fl_committed_all); 00336 00337 g_free(client); 00338 osync_trace(TRACE_EXIT, "%s", __func__); 00339 } 00340 00341 void *osync_client_message_sink(OSyncMember *member, const char *name, void *data, osync_bool synchronous) 00342 { 00343 OSyncClient *client = osync_member_get_data(member); 00344 OSyncEngine *engine = client->engine; 00345 if (!synchronous) { 00346 /*OSyncMessage *message = itm_message_new_signal(client, "PLUGIN_MESSAGE"); 00347 osync_debug("CLI", 3, "Sending message %p PLUGIN_MESSAGE for message %s", message, name); 00348 itm_message_set_data(message, "data", data); 00349 itm_message_set_data(message, "name", g_strdup(name)); 00350 itm_queue_send(engine->incoming, message);*/ 00351 return NULL; 00352 } else { 00353 return engine->plgmsg_callback(engine, client, name, data, engine->plgmsg_userdata); 00354 } 00355 } 00356 00357 OSyncPluginTimeouts osync_client_get_timeouts(OSyncClient *client) 00358 { 00359 return osync_plugin_get_timeouts(osync_member_get_plugin(client->member)); 00360 } 00361 00362 void osync_client_call_plugin(OSyncClient *client, char *function, void *data, OSyncPluginReplyHandler replyhandler, void *userdata) 00363 { 00364 osync_trace(TRACE_ENTRY, "%s(%p, %s, %p, %p, %p)", __func__, client, function, data, replyhandler, userdata); 00365 00366 /*OSyncEngine *engine = client->engine; 00367 ITMessage *message = itm_message_new_methodcall(engine, "CALL_PLUGIN"); 00368 itm_message_set_data(message, "data", data); 00369 itm_message_set_data(message, "function", g_strdup(function)); 00370 00371 if (replyhandler) { 00372 OSyncPluginCallContext *ctx = g_malloc0(sizeof(OSyncPluginCallContext)); 00373 ctx->handler = replyhandler; 00374 ctx->userdata = userdata; 00375 itm_message_set_handler(message, engine->incoming, (ITMessageHandler)_recv_plugin_answer, ctx); 00376 00377 itm_message_set_data(message, "want_reply", GINT_TO_POINTER(1)); 00378 } else 00379 itm_message_set_data(message, "want_reply", GINT_TO_POINTER(0)); 00380 00381 itm_queue_send(client->incoming, message);*/ 00382 00383 osync_trace(TRACE_EXIT, "%s", __func__); 00384 } 00385 00386 osync_bool osync_client_get_changes(OSyncClient *target, OSyncEngine *sender, OSyncError **error) 00387 { 00388 osync_trace(TRACE_ENTRY, "%s(%p, %p, %p)", __func__, target, sender, error); 00389 00390 osync_flag_changing(target->fl_sent_changes); 00391 00392 OSyncMessage *message = osync_message_new(OSYNC_MESSAGE_GET_CHANGES, 0, error); 00393 if (!message) 00394 goto error; 00395 00396 osync_message_set_handler(message, (OSyncMessageHandler)_get_changes_reply_receiver, target); 00397 00398 osync_member_write_sink_info(target->member, message); 00399 00400 OSyncPluginTimeouts timeouts = osync_client_get_timeouts(target); 00401 if (!osync_queue_send_message_with_timeout(target->commands_to_osplugin, target->commands_from_osplugin, message, timeouts.get_changeinfo_timeout, error)) 00402 goto error_free_message; 00403 00404 osync_message_unref(message); 00405 00406 osync_trace(TRACE_EXIT, "%s", __func__); 00407 return TRUE; 00408 00409 error_free_message: 00410 osync_message_unref(message); 00411 error: 00412 osync_trace(TRACE_EXIT_ERROR, "%s: %s", __func__, osync_error_print(error)); 00413 return FALSE; 00414 } 00415 00416 osync_bool osync_client_get_change_data(OSyncClient *target, OSyncEngine *sender, OSyncMappingEntry *entry, OSyncError **error) 00417 { 00418 osync_flag_changing(entry->fl_has_data); 00419 00420 OSyncMessage *message = osync_message_new(OSYNC_MESSAGE_GET_CHANGEDATA, 0, error); 00421 if (!message) 00422 goto error; 00423 00424 osync_message_set_handler(message, (OSyncMessageHandler)_get_change_data_reply_receiver, entry); 00425 00426 osync_marshal_change(message, entry->change); 00427 00428 osync_debug("ENG", 3, "Sending get_changedata message %p to client %p", message, entry->client); 00429 00430 OSyncPluginTimeouts timeouts = osync_client_get_timeouts(target); 00431 if (!osync_queue_send_message_with_timeout(target->commands_to_osplugin, target->commands_from_osplugin, message, timeouts.get_data_timeout, error)) 00432 goto error_free_message; 00433 00434 osync_message_unref(message); 00435 00436 osync_trace(TRACE_EXIT, "%s", __func__); 00437 return TRUE; 00438 00439 error_free_message: 00440 osync_message_unref(message); 00441 error: 00442 osync_trace(TRACE_EXIT_ERROR, "%s: %s", __func__, osync_error_print(error)); 00443 return FALSE; 00444 } 00445 00446 /*void osync_client_read_change(OSyncEngine *sender, OSyncMappingEntry *entry) 00447 { 00448 //osync_flag_changing(entry->fl_has_data); 00449 OSyncMessage *message = osync_message_new_methodcall(sender, "READ_CHANGE"); 00450 osync_message_set_handler(message, sender->incoming, (OSyncMessageHandler)_read_change_reply_receiver, sender); 00451 osync_message_set_data(message, "change", entry->change); 00452 osync_message_set_data(message, "entry", entry); 00453 osync_debug("ENG", 3, "Sending read_change message %p to client %p", message, entry->client); 00454 00455 OSyncPluginTimeouts timeouts = osync_client_get_timeouts(entry->client); 00456 osync_queue_send_with_timeout(entry->client->incoming, message, timeouts.read_change_timeout, sender); 00457 }*/ 00458 00459 osync_bool osync_client_connect(OSyncClient *target, OSyncEngine *sender, OSyncError **error) 00460 { 00461 osync_trace(TRACE_ENTRY, "%s(%p, %p, %p)", __func__, target, sender, error); 00462 00463 osync_flag_changing(target->fl_connected); 00464 00465 OSyncMessage *message = osync_message_new(OSYNC_MESSAGE_CONNECT, 0, error); 00466 if (!message) 00467 goto error; 00468 00469 osync_member_write_sink_info(target->member, message); 00470 00471 osync_message_set_handler(message, (OSyncMessageHandler)_connect_reply_receiver, target); 00472 00473 OSyncPluginTimeouts timeouts = osync_client_get_timeouts(target); 00474 if (!osync_queue_send_message_with_timeout(target->commands_to_osplugin, target->commands_from_osplugin, message, timeouts.connect_timeout, error)) 00475 goto error_free_message; 00476 00477 osync_message_unref(message); 00478 00479 osync_trace(TRACE_EXIT, "%s", __func__); 00480 return TRUE; 00481 00482 error_free_message: 00483 osync_message_unref(message); 00484 error: 00485 osync_trace(TRACE_EXIT_ERROR, "%s: %s", __func__, osync_error_print(error)); 00486 return FALSE; 00487 } 00488 00489 osync_bool osync_client_commit_change(OSyncClient *target, OSyncEngine *sender, OSyncMappingEntry *entry, OSyncError **error) 00490 { 00491 osync_trace(TRACE_ENTRY, "%s(%p, %p, %p)", __func__, target, sender, entry); 00492 osync_trace(TRACE_INTERNAL, "Committing change with uid %s, changetype %i, data %p, size %i, objtype %s and format %s from member %lli", osync_change_get_uid(entry->change), osync_change_get_changetype(entry->change), osync_change_get_data(entry->change), osync_change_get_datasize(entry->change), osync_change_get_objtype(entry->change) ? osync_objtype_get_name(osync_change_get_objtype(entry->change)) : "None", osync_change_get_objformat(entry->change) ? osync_objformat_get_name(osync_change_get_objformat(entry->change)) : "None", osync_member_get_id(entry->client->member)); 00493 00494 osync_flag_changing(entry->fl_dirty); 00495 00496 // convert the data to the format accepted by the member 00497 if (!osync_change_convert_member_sink(osync_group_get_format_env(sender->group), entry->change, target->member, error)) 00498 goto error; 00499 00500 if (osync_change_get_changetype(entry->change) == CHANGE_ADDED) { 00501 int elevated = 0; 00502 // Generate a new UID, if necessary 00503 OSyncMappingView *view = osengine_mappingtable_find_view(sender->maptable, target->member); 00504 while (!osengine_mappingview_uid_is_unique(view, entry, TRUE)) { 00505 if (!osync_change_elevate(sender, entry->change, 1)) 00506 break; 00507 elevated++; 00508 } 00509 00510 if (elevated) { 00511 // Save the newly generated UID 00512 if (!osync_change_save(entry->change, TRUE, error)) 00513 goto error; 00514 } 00515 } 00516 00517 OSyncMessage *message = osync_message_new(OSYNC_MESSAGE_COMMIT_CHANGE, 0, error); 00518 if (!message) 00519 goto error; 00520 00521 osync_marshal_change(message, entry->change); 00522 00523 osync_message_set_handler(message, (OSyncMessageHandler)_commit_change_reply_receiver, entry); 00524 OSyncPluginTimeouts timeouts = osync_client_get_timeouts(entry->client); 00525 00526 if (!osync_queue_send_message_with_timeout(target->commands_to_osplugin, target->commands_from_osplugin, message, timeouts.commit_timeout, error)) 00527 goto error_free_message; 00528 00529 osync_message_unref(message); 00530 00531 g_assert(osync_flag_is_attached(entry->fl_committed) == TRUE); 00532 osync_flag_detach(entry->fl_committed); 00533 00534 osync_trace(TRACE_EXIT, "%s", __func__); 00535 return TRUE; 00536 00537 error_free_message: 00538 osync_message_unref(message); 00539 error: 00540 osync_trace(TRACE_EXIT_ERROR, "%s: %s", __func__, osync_error_print(error)); 00541 return FALSE; 00542 } 00543 00544 osync_bool osync_client_sync_done(OSyncClient *target, OSyncEngine *sender, OSyncError **error) 00545 { 00546 osync_trace(TRACE_ENTRY, "%s(%p, %p, %p)", __func__, target, sender, error); 00547 00548 osync_flag_changing(target->fl_done); 00549 OSyncMessage *message = osync_message_new(OSYNC_MESSAGE_SYNC_DONE, 0, error); 00550 if (!message) 00551 goto error; 00552 00553 osync_message_set_handler(message, (OSyncMessageHandler)_sync_done_reply_receiver, target); 00554 00555 OSyncPluginTimeouts timeouts = osync_client_get_timeouts(target); 00556 if (!osync_queue_send_message_with_timeout(target->commands_to_osplugin, target->commands_from_osplugin, message, timeouts.sync_done_timeout, error)) 00557 goto error_free_message; 00558 00559 osync_message_unref(message); 00560 00561 osync_trace(TRACE_EXIT, "%s", __func__); 00562 return TRUE; 00563 00564 error_free_message: 00565 osync_message_unref(message); 00566 error: 00567 osync_trace(TRACE_EXIT_ERROR, "%s: %s", __func__, osync_error_print(error)); 00568 return FALSE; 00569 } 00570 00571 osync_bool osync_client_committed_all(OSyncClient *target, OSyncEngine *sender, OSyncError **error) 00572 { 00573 osync_trace(TRACE_ENTRY, "%s(%p, %p)", __func__, target, sender); 00574 00575 osync_flag_changing(target->fl_committed_all); 00576 00577 OSyncMessage *message = osync_message_new(OSYNC_MESSAGE_COMMITTED_ALL, 0, error); 00578 if (!message) 00579 goto error; 00580 00581 osync_message_set_handler(message, (OSyncMessageHandler)_committed_all_reply_receiver, target); 00582 00583 //OSyncPluginTimeouts timeouts = osync_client_get_timeouts(target); 00584 /*FIXME: Add timeout to committed_all message */ 00585 if (!osync_queue_send_message(target->commands_to_osplugin, target->commands_from_osplugin, message, error)) 00586 goto error_free_message; 00587 00588 osync_message_unref(message); 00589 00590 osync_trace(TRACE_EXIT, "%s", __func__); 00591 return TRUE; 00592 00593 error_free_message: 00594 osync_message_unref(message); 00595 error: 00596 osync_trace(TRACE_EXIT_ERROR, "%s: %s", __func__, osync_error_print(error)); 00597 return FALSE; 00598 } 00599 00600 osync_bool osync_client_disconnect(OSyncClient *target, OSyncEngine *sender, OSyncError **error) 00601 { 00602 osync_trace(TRACE_ENTRY, "%s(%p, %p)", __func__, target, sender); 00603 00604 osync_flag_changing(target->fl_connected); 00605 00606 OSyncMessage *message = osync_message_new(OSYNC_MESSAGE_DISCONNECT, 0, error); 00607 if (!message) 00608 goto error; 00609 00610 osync_message_set_handler(message, (OSyncMessageHandler)_disconnect_reply_receiver, target); 00611 00612 OSyncPluginTimeouts timeouts = osync_client_get_timeouts(target); 00613 if (!osync_queue_send_message_with_timeout(target->commands_to_osplugin, target->commands_from_osplugin, message, timeouts.disconnect_timeout, error)) 00614 goto error_free_message; 00615 00616 osync_message_unref(message); 00617 00618 osync_trace(TRACE_EXIT, "%s", __func__); 00619 return TRUE; 00620 00621 error_free_message: 00622 osync_message_unref(message); 00623 error: 00624 osync_trace(TRACE_EXIT_ERROR, "%s: %s", __func__, osync_error_print(error)); 00625 return FALSE; 00626 } 00627 00628 00629 /* 00630 void osync_client_call_plugin_with_reply(OSyncClient *client, char *function, void *data, void ( *replyhandler)(OSyncEngine *, OSyncClient *, void *, OSyncError *), int timeout) 00631 { 00632 OSyncEngine *engine = client->engine; 00633 ITMessage *message = itm_message_new_signal(engine, "CALL_PLUGIN"); 00634 osync_debug("CLI", 3, "Sending message %p CALL_PLUGIN for function %s", message, function); 00635 itm_message_set_data(message, "data", data); 00636 itm_message_set_data(message, "function", g_strdup(function)); 00637 itm_queue_send_with_reply(client->incoming, message); 00638 }*/ 00639 00640 char *osync_client_pid_filename(OSyncClient *client) 00641 { 00642 return g_strdup_printf("%s/osplugin.pid", client->member->configdir); 00643 } 00644 00645 osync_bool osync_client_remove_pidfile(OSyncClient *client, OSyncError **error) 00646 { 00647 osync_bool ret = FALSE; 00648 char *pidpath = osync_client_pid_filename(client); 00649 00650 if (unlink(pidpath) < 0) { 00651 osync_error_set(error, OSYNC_ERROR_GENERIC, "Couldn't remove pid file: %s", strerror(errno)); 00652 goto out_free_path; 00653 } 00654 00655 /* Success */ 00656 ret = TRUE; 00657 00658 out_free_path: 00659 g_free(pidpath); 00660 //out: 00661 return ret; 00662 } 00663 00664 osync_bool osync_client_create_pidfile(OSyncClient *client, OSyncError **error) 00665 { 00666 osync_bool ret = FALSE; 00667 char *pidpath = osync_client_pid_filename(client); 00668 char *pidstr = g_strdup_printf("%ld", (long)client->child_pid); 00669 00670 if (!osync_file_write(pidpath, pidstr, strlen(pidstr), 0644, error)) 00671 goto out_free_pidstr; 00672 00673 /* Success */ 00674 ret = TRUE; 00675 00676 out_free_pidstr: 00677 g_free(pidstr); 00678 //out_free_path: 00679 g_free(pidpath); 00680 //out: 00681 return ret; 00682 } 00683 00684 osync_bool osync_client_kill_old_osplugin(OSyncClient *client, OSyncError **error) 00685 { 00686 osync_bool ret = FALSE; 00687 00688 char *pidstr; 00689 int pidlen; 00690 pid_t pid; 00691 00692 char *pidpath = osync_client_pid_filename(client); 00693 00694 /* Simply returns if there is no PID file */ 00695 if (!g_file_test(pidpath, G_FILE_TEST_EXISTS)) { 00696 ret = TRUE; 00697 goto out_free_path; 00698 } 00699 00700 if (!osync_file_read(pidpath, &pidstr, &pidlen, error)) 00701 goto out_free_path; 00702 00703 pid = atol(pidstr); 00704 if (!pid) 00705 goto out_free_str; 00706 00707 osync_trace(TRACE_INTERNAL, "Killing old osplugin process. PID: %ld", (long)pid); 00708 00709 if (kill(pid, SIGTERM) < 0) { 00710 osync_trace(TRACE_INTERNAL, "Error killing old osplugin: %s. Stale pid file?", strerror(errno)); 00711 /* Don't return failure if kill() failed, because it may be a stale pid file */ 00712 } 00713 00714 int count = 0; 00715 while (osync_queue_is_alive(client->commands_to_osplugin)) { 00716 if (count++ > 10) { 00717 osync_trace(TRACE_INTERNAL, "Killing old osplugin process with SIGKILL"); 00718 kill(pid, SIGKILL); 00719 break; 00720 } 00721 osync_trace(TRACE_INTERNAL, "Waiting for other side to terminate"); 00722 /*FIXME: Magic numbers are evil */ 00723 usleep(500000); 00724 } 00725 00726 if (unlink(pidpath) < 0) { 00727 osync_error_set(error, OSYNC_ERROR_GENERIC, "Couldn't erase PID file: %s", strerror(errno)); 00728 goto out_free_str; 00729 } 00730 00731 /* Success */ 00732 ret = TRUE; 00733 00734 out_free_str: 00735 g_free(pidstr); 00736 out_free_path: 00737 g_free(pidpath); 00738 //out: 00739 return ret; 00740 } 00741 00742 00743 osync_bool osync_client_spawn(OSyncClient *client, OSyncEngine *engine, OSyncError **error) 00744 { 00745 osync_trace(TRACE_ENTRY, "%s(%p, %p, %p)", __func__, client, engine, error); 00746 00747 int waiting = 0; 00748 00749 if (!osync_client_kill_old_osplugin(client, error)) 00750 goto error; 00751 00752 if (!osync_queue_exists(client->commands_to_osplugin) || !osync_queue_is_alive(client->commands_to_osplugin)) { 00753 pid_t cpid = fork(); 00754 if (cpid == 0) { 00755 osync_trace_reset_indent(); 00756 00757 /* Export all options to osplugin through environment variables */ 00758 osync_env_export_all_options(osync_group_get_env(engine->group)); 00759 00760 OSyncMember *member = client->member; 00761 OSyncPlugin *plugin = osync_member_get_plugin(member); 00762 const char *path = osync_plugin_get_path(plugin); 00763 setenv("OSYNC_MODULE_LIST", path, 1); 00764 00765 osync_env_export_loaded_modules(osync_group_get_env(engine->group)); 00766 00767 char *memberstring = g_strdup_printf("%lli", osync_member_get_id(client->member)); 00768 execlp(OSPLUGIN, OSPLUGIN, osync_group_get_configdir(engine->group), memberstring, NULL); 00769 00770 if (errno == ENOENT) { 00771 execlp("./osplugin", "osplugin", osync_group_get_configdir(engine->group), memberstring, NULL); 00772 } 00773 00774 osync_trace(TRACE_INTERNAL, "unable to exec"); 00775 exit(1); 00776 } 00777 00778 client->child_pid = cpid; 00779 00780 /* We are going to wait 5 seconds for plugin */ 00781 while (!osync_queue_exists(client->commands_to_osplugin) && waiting <= 5) { 00782 osync_trace(TRACE_INTERNAL, "Waiting for other side to create fifo"); 00783 00784 sleep(1); 00785 waiting++; 00786 } 00787 00788 osync_trace(TRACE_INTERNAL, "Queue was created"); 00789 } 00790 00791 if (client->child_pid) { 00792 if (!osync_client_create_pidfile(client, error)) 00793 goto error; 00794 } 00795 00796 if (!osync_queue_connect(client->commands_to_osplugin, OSYNC_QUEUE_SENDER, error)) 00797 goto error; 00798 00799 OSyncMessage *message = osync_message_new(OSYNC_MESSAGE_INITIALIZE, 0, error); 00800 if (!message) 00801 goto error_disconnect; 00802 00803 osync_message_write_string(message, client->commands_from_osplugin->name); 00804 00805 if (!osync_queue_send_message(client->commands_to_osplugin, NULL, message, error)) 00806 goto error_free_message; 00807 00808 osync_message_unref(message); 00809 00810 osync_trace(TRACE_EXIT, "%s", __func__); 00811 return TRUE; 00812 00813 error_free_message: 00814 osync_message_unref(message); 00815 error_disconnect: 00816 osync_queue_disconnect(client->commands_to_osplugin, NULL); 00817 error: 00818 osync_trace(TRACE_EXIT_ERROR, "%s: %s", __func__, osync_error_print(error)); 00819 return FALSE; 00820 } 00821 00822 osync_bool osync_client_init(OSyncClient *client, OSyncEngine *engine, OSyncError **error) 00823 { 00824 osync_trace(TRACE_ENTRY, "%s(%p, %p, %p)", __func__, client, engine, error); 00825 00826 OSyncMessage *reply = osync_queue_get_message(client->commands_from_osplugin); 00827 00828 osync_trace(TRACE_INTERNAL, "reply received %i", reply->cmd); 00829 if (reply->cmd == OSYNC_MESSAGE_ERRORREPLY) { 00830 if (error) 00831 osync_demarshal_error(reply, error); 00832 goto error_free_reply; 00833 } 00834 00835 if (reply->cmd != OSYNC_MESSAGE_REPLY) { 00836 osync_error_set(error, OSYNC_ERROR_GENERIC, "Invalid answer from plugin process"); 00837 goto error_free_reply; 00838 } 00839 00840 osync_message_unref(reply); 00841 00842 osync_trace(TRACE_EXIT, "%s", __func__); 00843 return TRUE; 00844 00845 error_free_reply: 00846 osync_message_unref(reply); 00847 osync_trace(TRACE_EXIT_ERROR, "%s: %s", __func__, osync_error_print(error)); 00848 return FALSE; 00849 } 00850 00851 osync_bool osync_client_finalize(OSyncClient *client, OSyncError **error) 00852 { 00853 osync_trace(TRACE_ENTRY, "%s(%p, %p)", __func__, client, error); 00854 00855 OSyncMessage *message = osync_message_new(OSYNC_MESSAGE_FINALIZE, 0, error); 00856 if (!message) 00857 goto error; 00858 00859 if (!osync_queue_send_message(client->commands_to_osplugin, NULL, message, error)) 00860 goto error_free_message; 00861 00862 osync_message_unref(message); 00863 00864 if (client->child_pid) { 00865 int status; 00866 if (waitpid(client->child_pid, &status, 0) == -1) { 00867 osync_error_set(error, OSYNC_ERROR_GENERIC, "Error waiting for osplugin process: %s", strerror(errno)); 00868 goto error; 00869 } 00870 00871 if (!WIFEXITED(status)) 00872 osync_trace(TRACE_INTERNAL, "Child has exited abnormally"); 00873 else if (WEXITSTATUS(status) != 0) 00874 osync_trace(TRACE_INTERNAL, "Child has returned non-zero exit status (%d)", WEXITSTATUS(status)); 00875 00876 if (!osync_client_remove_pidfile(client, error)) 00877 goto error; 00878 } 00879 00880 osync_queue_disconnect(client->commands_to_osplugin, NULL); 00881 00882 00883 osync_trace(TRACE_EXIT, "%s", __func__); 00884 return TRUE; 00885 00886 error_free_message: 00887 osync_message_unref(message); 00888 error: 00889 osync_trace(TRACE_EXIT_ERROR, "%s: %s", __func__, osync_error_print(error)); 00890 return FALSE; 00891 }