• Main Page
  • Related Pages
  • Modules
  • Data Structures
  • Files
  • File List

osengine/osengine_engine.c

00001 /*
00002  * libosengine - A synchronization engine for the opensync framework
00003  * Copyright (C) 2004-2005  Armin Bauer <armin.bauer@opensync.org>
00004  * 
00005  * This library is free software; you can redistribute it and/or
00006  * modify it under the terms of the GNU Lesser General Public
00007  * License as published by the Free Software Foundation; either
00008  * version 2.1 of the License, or (at your option) any later version.
00009  * 
00010  * This library is distributed in the hope that it will be useful,
00011  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00012  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00013  * Lesser General Public License for more details.
00014  * 
00015  * You should have received a copy of the GNU Lesser General Public
00016  * License along with this library; if not, write to the Free Software
00017  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307  USA
00018  * 
00019  */
00020  
00021 #include "engine.h"
00022 
00023 #include <errno.h>
00024 #include <sys/stat.h>
00025 #include <sys/types.h>
00026 
00027 #include <glib.h>
00028 
00029 #include <opensync/opensync_support.h>
00030 #include "opensync/opensync_message_internals.h"
00031 #include "opensync/opensync_queue_internals.h"
00032 #include "opensync/opensync_format_internals.h"
00033 
00034 #include "engine_internals.h"
00035 #include <opensync/opensync_user_internals.h>
00036 
00037 OSyncMappingEntry *osengine_mappingtable_find_entry(OSyncMappingTable *table, const char *uid, const char *objtype, long long int memberid);
00056 
00057 void _new_change_receiver(OSyncEngine *engine, OSyncClient *client, OSyncChange *change)
00058 {
00059         osync_trace(TRACE_ENTRY, "%s(%p, %p, %p)", __func__, engine, client, change);
00060 
00061         OSyncError *error = NULL;
00062         OSyncChangeType change_type = osync_change_get_changetype(change);
00063         OSyncFormatEnv *format_env = osync_group_get_format_env(engine->group);
00064         OSyncObjType *objtype = osync_change_get_objtype(change);
00065         const char* uid = osync_change_get_uid(change);
00066         OSyncObjFormat *objformat = osync_change_get_objformat(change);
00067 
00068         osync_change_set_member(change, client->member);
00069 
00070         osync_trace(TRACE_INTERNAL, "Handling new change with uid %s, changetype %i, objtype %s and format %s from member %lli", uid, change_type, 
00071         objtype ? osync_objtype_get_name(objtype) : "None", osync_change_get_objformat(change) ? osync_objformat_get_name(osync_change_get_objformat(change)) : "None",
00072         osync_member_get_id(client->member));
00073 
00074 
00081         if ( (change_type != CHANGE_DELETED) &&
00082              (osync_change_has_data(change))) {
00083                 osync_bool is_file_objformat = FALSE;
00084                 if(objformat)
00085                         is_file_objformat = 
00086                                 ((!strcmp(objformat->name, "file"))?(TRUE):(FALSE));
00087                 if ( (!objtype) || (!objformat) ||
00088                      (!strcmp(osync_objtype_get_name(objtype), "data")) ||
00089                      (!strcmp(objformat->name, "plain"))) {
00090                         OSyncObjType *objtype_test = osync_change_detect_objtype_full(format_env, change, &error);
00091                         objtype = (objtype_test)?(objtype_test):(objtype);
00092                 }
00093                 if (objtype) {
00094                         osync_trace(TRACE_INTERNAL, "Detected the object to be of type %s", osync_objtype_get_name(objtype));
00095 
00096                         osync_change_set_objtype(change, objtype);
00097 
00102                         if ( ( (osync_group_get_slow_sync(engine->group,
00103                                  osync_objtype_get_name(objtype))) || 
00104                                ( (!is_file_objformat) &&
00105                                  (!osengine_mappingtable_find_entry(
00106                                         engine->maptable, uid,
00107                                         osync_objtype_get_name(objtype),
00108                                         osync_member_get_id(client->member))) ) 
00109                               ) && (change_type == CHANGE_MODIFIED) ){
00110                                 osync_change_set_changetype(change, CHANGE_ADDED);
00111                                 change_type = osync_change_get_changetype(change);
00112                         }
00113                 }
00114         } else 
00115                 if (change_type == CHANGE_DELETED){
00121                         if ( !objtype ||
00122                              (( !strcmp(osync_objtype_get_name(objtype), "data") ) &&
00123                              ( !osengine_mappingtable_find_entry(
00124                                         engine->maptable, uid,
00125                                  osync_objtype_get_name(objtype),
00126                                 osync_member_get_id(client->member)) )) ){
00127 
00128                                 OSyncMappingEntry *entry = 
00129                                         osengine_mappingtable_find_entry(
00130                                                 engine->maptable, uid, NULL,
00131                                                 osync_member_get_id(client->member)
00132                                         );
00133                                 if (entry) {
00134                                         osync_change_set_objtype(change,
00135                                                  osync_change_get_objtype(
00136                                                         entry->change));
00137                                         objtype=osync_change_get_objtype(change);
00138                                 } else {
00139                                         osync_error_set(&error, OSYNC_ERROR_GENERIC,
00140                                                  "Could not find one entry with UID=%s to delete.", uid);
00141                                         goto error;
00142                                 }
00143                         }
00144                 } else {
00145                         osync_trace(TRACE_INTERNAL, "Change has no data!");
00146                 }
00147         
00148         osync_trace(TRACE_INTERNAL, "Handling new change with uid %s, changetype %i, data %p, size %i, objtype %s and format %s from member %lli", uid, change_type, osync_change_get_data(change), osync_change_get_datasize(change), objtype ? osync_objtype_get_name(objtype) : "None", osync_change_get_objformat(change) ? osync_objformat_get_name(osync_change_get_objformat(change)) : "None", osync_member_get_id(client->member));
00149 
00150         if (!objtype){
00151                 osync_error_set(&error, OSYNC_ERROR_GENERIC,
00152                         "ObjType not set for uid %s.", uid);
00153                 goto error;
00154         }
00155         
00156         
00157         OSyncMappingEntry *entry = osengine_mappingtable_store_change(engine->maptable, change);
00158         change = entry->change;
00159         if (!osync_change_save(change, TRUE, &error)) {
00160                 osync_error_duplicate(&engine->error, &error);
00161                 osync_status_update_change(engine, change, CHANGE_RECV_ERROR, &error);
00162                 osync_error_update(&engine->error, "Unable to receive one or more objects");
00163                 osync_flag_unset(entry->fl_has_data);
00164                 goto error;
00165         }
00166         
00167         osync_group_remove_changelog(engine->group, change, &error);
00168         
00169         //We convert to the common format here to make sure we always pass it
00170         osync_change_convert_to_common(change, NULL);
00171         
00172         if (!entry->mapping) {
00173                 osync_flag_attach(entry->fl_mapped, engine->cmb_entries_mapped);
00174                 osync_flag_unset(entry->fl_mapped);
00175                 osync_debug("ENG", 3, "+It has no mapping");
00176         } else {
00177                 osync_debug("ENG", 3, "+It has mapping");
00178                 osync_flag_set(entry->fl_mapped);
00179                 osync_flag_unset(entry->mapping->fl_solved);
00180                 osync_flag_unset(entry->mapping->fl_chkconflict);
00181                 osync_flag_unset(entry->mapping->fl_multiplied);
00182         }
00183         
00184         if (osync_change_has_data(change)) {
00185                 osync_debug("ENG", 3, "+It has data");
00186                 osync_flag_set(entry->fl_has_data);
00187                 osync_status_update_change(engine, change, CHANGE_RECEIVED, NULL);
00188         } else {
00189                 osync_debug("ENG", 3, "+It has no data");
00190                 osync_flag_unset(entry->fl_has_data);
00191                 osync_status_update_change(engine, change, CHANGE_RECEIVED_INFO, NULL);
00192         }
00193         
00194         if (osync_change_get_changetype(change) == CHANGE_DELETED)
00195                 osync_flag_set(entry->fl_deleted);
00196         
00197         osync_flag_set(entry->fl_has_info);
00198         osync_flag_unset(entry->fl_synced);
00199 
00200         osengine_mappingentry_decider(engine, entry);
00201         
00202         osync_trace(TRACE_EXIT, "%s", __func__);
00203         return;
00204         
00205 error:
00206         osync_trace(TRACE_EXIT_ERROR, "%s: %s", __func__, osync_error_print(&error));
00207         osync_error_free(&error);
00208         return;
00209 }
00210 
00211 OSyncClient *osengine_get_client(OSyncEngine *engine, long long int memberId)
00212 {
00213         GList *c = NULL;
00214         for (c = engine->clients; c; c = c->next) {
00215                 OSyncClient *client = c->data;
00216                 if (osync_member_get_id(client->member) == memberId)
00217                         return client;
00218         }
00219         return NULL;
00220 }
00221 
00222 
00223 void send_engine_changed(OSyncEngine *engine)
00224 {
00225         if (!engine->is_initialized)
00226                 return;
00227 
00228         OSyncMessage *message = osync_message_new(OSYNC_MESSAGE_ENGINE_CHANGED, 0, NULL);
00229         /*FIXME: Handle errors here */
00230 
00231         osync_debug("ENG", 4, "Sending message %p:\"ENGINE_CHANGED\"", message);
00232         osync_queue_send_message(engine->commands_to_self, NULL, message, NULL);
00233 }
00234 
00235 void send_mapping_changed(OSyncEngine *engine, OSyncMapping *mapping)
00236 {
00237         OSyncMessage *message = osync_message_new(OSYNC_MESSAGE_MAPPING_CHANGED, sizeof(long long), NULL);
00238         osync_message_write_long_long_int(message, mapping->id);
00239         /*FIXME: Handle errors here */
00240 
00241         osync_queue_send_message(engine->commands_to_self, NULL, message, NULL);
00242         /*FIXME: Handle errors here, too */
00243 }
00244 
00245 void send_mappingentry_changed(OSyncEngine *engine, OSyncMappingEntry *entry)
00246 {
00247         OSyncMessage *message = osync_message_new(OSYNC_MESSAGE_MAPPINGENTRY_CHANGED, sizeof(long long)*2, NULL);
00248 
00249         /*FIXME: don't pass a pointer through the messaging system */
00250         long long ptr = (long long)(long)entry;
00251         osync_message_write_long_long_int(message, ptr);
00252         /*FIXME: Handle errors here */
00253 
00254         osync_queue_send_message(engine->commands_to_self, NULL, message, NULL);
00255         /*FIXME: Handle errors here, too */
00256 }
00257 
00265 static void engine_message_handler(OSyncMessage *message, OSyncEngine *engine)
00266 {
00267         osync_trace(TRACE_ENTRY, "engine_message_handler(%p:%lli-%i, %p)", message, message->id1, message->id2, engine);
00268         
00269         OSyncChange *change = NULL;
00270                         
00271         osync_trace(TRACE_INTERNAL, "engine received command %i", osync_message_get_command(message));
00272         
00273         switch (osync_message_get_command(message)) {
00274                 case OSYNC_MESSAGE_SYNCHRONIZE:
00275                         osync_trace(TRACE_INTERNAL, "all deciders");
00276                         osengine_client_all_deciders(engine);
00277                         break;
00278                 case OSYNC_MESSAGE_NEW_CHANGE:
00279                         osync_demarshal_change(message, osync_group_get_format_env(engine->group), &change);
00280                         
00281                         long long int member_id = 0;
00282                         osync_message_read_long_long_int(message, &member_id);
00283                         OSyncClient *sender = osengine_get_client(engine, member_id);
00284                         
00285                         _new_change_receiver(engine, sender, change);
00286                         break;
00287                 case OSYNC_MESSAGE_ENGINE_CHANGED:
00288                         osengine_client_all_deciders(engine);
00289                         osengine_mapping_all_deciders(engine);
00290                         GList *u;
00291                         for (u = engine->maptable->unmapped; u; u = u->next) {
00292                                 OSyncMappingEntry *unmapped = u->data;
00293                                 send_mappingentry_changed(engine, unmapped);
00294                         }
00295                         break;
00296                 case OSYNC_MESSAGE_MAPPING_CHANGED:
00297                 {
00298                         long long id;
00299                         osync_message_read_long_long_int(message, &id);
00300                         /*FIXME: check errors by read_long_long_int */
00301                         OSyncMapping *mapping = osengine_mappingtable_mapping_from_id(engine->maptable, id);
00302                         
00303                         if (!g_list_find(engine->maptable->mappings, mapping)) {
00304                                 osync_trace(TRACE_EXIT, "%s: Mapping %p is dead", __func__, mapping);
00305                                 return;
00306                         }
00307                         
00308                         osengine_mapping_decider(engine, mapping);
00309                 }
00310                 break;
00311                 case OSYNC_MESSAGE_MAPPINGENTRY_CHANGED:
00312                 {
00313                         long long ptr;
00314                         osync_message_read_long_long_int(message, &ptr);
00315                         OSyncMappingEntry *entry = (OSyncMappingEntry*)(long)ptr;
00316                         
00317                         if (!g_list_find(engine->maptable->entries, entry) && !g_list_find(engine->maptable->unmapped, entry)) {
00318                                 osync_trace(TRACE_EXIT, "%s: Entry %p is dead", __func__, entry);
00319                                 return;
00320                         }
00321                         
00322                         osengine_mappingentry_decider(engine, entry);
00323                 }
00324                 break;
00325                 case OSYNC_MESSAGE_SYNC_ALERT:
00326                         if (engine->allow_sync_alert)
00327                                 osync_flag_set(engine->fl_running);
00328                         else
00329                                 osync_trace(TRACE_INTERNAL, "Sync Alert not allowed");
00330                 break;
00331 
00332                 default:
00333                         break;
00334         }
00335         
00336         /*TODO: Implement handling of the messages listed below, on commented code */
00337 
00338         /*      
00339         if (osync_message_is_signal (message, "CLIENT_CHANGED")) {
00340                 OSyncClient *client = osync_message_get_data(message, "client");
00341                 
00342                 if (!g_list_find(engine->clients, client)) {
00343                         osync_trace(TRACE_EXIT, "%s: Client %p is dead", __func__, client);
00344                         return;
00345                 }
00346                 
00347                 osengine_client_decider(engine, client);
00348                 osync_trace(TRACE_EXIT, "engine_message_handler");
00349                 return;
00350         }
00351         
00352         if (osync_message_is_signal (message, "PLUGIN_MESSAGE")) {
00353                 char *name = osync_message_get_data(message, "name");
00354                 void *data = osync_message_get_data(message, "data");
00355                 engine->plgmsg_callback(engine, sender, name, data, engine->plgmsg_userdata);
00356                 osync_trace(TRACE_EXIT, "engine_message_handler");
00357                 return;
00358         }
00359         
00360         osync_debug("ENG", 0, "Unknown message \"%s\"", osync_message_get_msgname(message));
00361         osync_trace(TRACE_EXIT_ERROR, "engine_message_handler: Unknown message");
00362         g_assert_not_reached();*/
00363         osync_trace(TRACE_EXIT, "%s", __func__);
00364 }
00365 
00366 static void trigger_clients_sent_changes(OSyncEngine *engine)
00367 {
00368         osync_trace(TRACE_ENTRY, "%s(%p)", __func__, engine);
00369         osync_status_update_engine(engine, ENG_ENDPHASE_READ, NULL);
00370         
00371         g_mutex_lock(engine->info_received_mutex);
00372         g_cond_signal(engine->info_received);
00373         g_mutex_unlock(engine->info_received_mutex);
00374         
00375         //Load the old mappings
00376         osengine_mappingtable_inject_changes(engine->maptable);
00377         
00378         send_engine_changed(engine);
00379         osync_trace(TRACE_EXIT, "%s", __func__);
00380 }
00381 
00382 static void trigger_clients_read_all(OSyncEngine *engine)
00383 {
00384         osync_trace(TRACE_ENTRY, "%s(%p)", __func__, engine);
00385 
00386         send_engine_changed(engine);
00387         osync_trace(TRACE_EXIT, "%s", __func__);
00388 }
00389 
00390 static void trigger_status_end_conflicts(OSyncEngine *engine)
00391 {
00392         osync_trace(TRACE_ENTRY, "%s(%p)", __func__, engine);
00393         osync_status_update_engine(engine, ENG_END_CONFLICTS, NULL);
00394         
00395         osync_trace(TRACE_EXIT, "%s", __func__);
00396 }
00397 
00398 static void trigger_clients_connected(OSyncEngine *engine)
00399 {
00400         osync_trace(TRACE_ENTRY, "%s(%p)", __func__, engine);
00401         osync_status_update_engine(engine, ENG_ENDPHASE_CON, NULL);
00402         osengine_client_all_deciders(engine);
00403         
00404         osync_trace(TRACE_EXIT, "%s", __func__);
00405 }
00406 
00407 static void trigger_clients_comitted_all(OSyncEngine *engine)
00408 {
00409         osync_trace(TRACE_ENTRY, "%s(%p)", __func__, engine);
00410         osync_status_update_engine(engine, ENG_ENDPHASE_WRITE, NULL);
00411         
00412         osync_trace(TRACE_EXIT, "%s", __func__);
00413 }
00414 
00415 
00416 /*void send_engine_committed_all(OSyncEngine *engine)
00417 {
00418         osync_trace(TRACE_ENTRY, "%s(%p)", __func__, engine);
00419         
00420         engine->committed_all_sent = TRUE;
00421         
00422         osync_trace(TRACE_INTERNAL, "++++ ENGINE COMMAND: Committed all ++++");
00423                 
00424         GList *c = NULL;
00425         for (c = engine->clients; c; c = c->next) {
00426                 OSyncClient *client = c->data;
00427                 if (osync_flag_is_not_set(client->fl_committed_all))
00428                         send_committed_all(client, engine);
00429         }
00430         
00431         osync_trace(TRACE_EXIT, "%s", __func__);
00432 }
00433 
00434 static void trigger_engine_committed_all(OSyncEngine *engine)
00435 {
00436         osync_trace(TRACE_ENTRY, "%s(%p)", __func__, engine);
00437         
00438         if (osync_flag_is_not_set(engine->cmb_multiplied)) {
00439                 osync_trace(TRACE_EXIT, "%s: Not multiplied yet", __func__);
00440                 return;
00441         }
00442         
00443         send_engine_committed_all(engine);
00444         
00445         osync_trace(TRACE_EXIT, "%s", __func__);
00446 }*/
00447 
00448 static gboolean startupfunc(gpointer data)
00449 {
00450         OSyncEngine *engine = data;
00451         osync_trace(TRACE_INTERNAL, "+++++++++ This is the engine of group \"%s\" +++++++++", osync_group_get_name(engine->group));
00452         
00453         OSyncError *error = NULL;
00454         if (!osengine_mappingtable_load(engine->maptable, &error)) {
00455                 osync_error_duplicate(&engine->error, &error);
00456                 osync_status_update_engine(engine, ENG_ERROR, &error);
00457                 osync_error_update(&engine->error, "Unable to connect one of the members");
00458                 osync_flag_set(engine->fl_stop);
00459         }
00460         
00461         g_mutex_lock(engine->started_mutex);
00462         g_cond_signal(engine->started);
00463         g_mutex_unlock(engine->started_mutex);
00464         return FALSE;
00465 }
00466 
00478 
00489 osync_bool osengine_reset(OSyncEngine *engine, OSyncError **error)
00490 {
00491         //FIXME Check if engine is running
00492         osync_trace(TRACE_ENTRY, "osengine_reset(%p, %p)", engine, error);
00493         GList *c = NULL;
00494         for (c = engine->clients; c; c = c->next) {
00495                 OSyncClient *client = c->data;
00496                 osync_client_reset(client);
00497         }
00498         
00499         osync_flag_set_state(engine->fl_running, FALSE);
00500         osync_flag_set_state(engine->fl_stop, FALSE);
00501         osync_flag_set_state(engine->cmb_sent_changes, FALSE);
00502         osync_flag_set_state(engine->cmb_entries_mapped, TRUE);
00503         osync_flag_set_state(engine->cmb_synced, TRUE);
00504         osync_flag_set_state(engine->cmb_chkconflict, TRUE);
00505         osync_flag_set_state(engine->cmb_finished, FALSE);
00506         osync_flag_set_state(engine->cmb_connected, FALSE);
00507         osync_flag_set_state(engine->cmb_read_all, TRUE);
00508         osync_flag_set_state(engine->cmb_committed_all, TRUE);
00509         osync_flag_set_state(engine->cmb_committed_all_sent, FALSE);
00510         
00511         osync_status_update_engine(engine, ENG_ENDPHASE_DISCON, NULL);
00512         
00513         engine->committed_all_sent = FALSE;
00514         
00515         osengine_mappingtable_reset(engine->maptable);
00516         
00517         if (engine->error) {
00518                 //FIXME We might be leaking memory here
00519                 OSyncError *newerror = NULL;
00520                 osync_error_duplicate(&newerror, &engine->error);
00521                 osync_status_update_engine(engine, ENG_ERROR, &newerror);
00522                 osync_group_set_slow_sync(engine->group, "data", TRUE);
00523         } else {
00524                 osync_status_update_engine(engine, ENG_SYNC_SUCCESSFULL, NULL);
00525                 osync_group_reset_slow_sync(engine->group, "data");
00526         }
00527         
00528         osync_trace(TRACE_INTERNAL, "engine error is %p", engine->error);
00529         
00530         g_mutex_lock(engine->syncing_mutex);
00531         g_cond_signal(engine->syncing);
00532         g_mutex_unlock(engine->syncing_mutex);
00533 
00534         osync_trace(TRACE_EXIT, "osengine_reset");
00535         return TRUE;
00536 }
00537 
00538 /* Implementation of g_mkdir_with_parents()
00539  *
00540  * This function overwrite the contents of the 'dir' parameter
00541  */
00542 static int __mkdir_with_parents(char *dir, int mode)
00543 {
00544         if (g_file_test(dir, G_FILE_TEST_IS_DIR))
00545                 return 0;
00546 
00547         char *slash = strrchr(dir, '/');
00548         if (slash && slash != dir) {
00549                 /* Create parent directory if needed */
00550 
00551                 /* This is a trick: I don't want to allocate a new string
00552                  * for the parent directory. So, just put a NUL char
00553                  * in the last slash, and restore it after creating the
00554                  * parent directory
00555                  */
00556                 *slash = '\0';
00557                 if (__mkdir_with_parents(dir, mode) < 0)
00558                         return -1;
00559                 *slash = '/';
00560         }
00561 
00562         if (mkdir(dir, mode) < 0)
00563                 return -1;
00564 
00565         return 0;
00566 }
00567 
00568 static int mkdir_with_parents(const char *dir, int mode)
00569 {
00570         int r;
00571         char *mydir = strdup(dir);
00572         if (!mydir)
00573                 return -1;
00574 
00575         r = __mkdir_with_parents(mydir, mode);
00576         free(mydir);
00577         return r;
00578 }
00579 
00589 OSyncEngine *osengine_new(OSyncGroup *group, OSyncError **error)
00590 {
00591         osync_trace(TRACE_ENTRY, "%s(%p, %p)", __func__, group, error);
00592         
00593         g_assert(group);
00594         OSyncEngine *engine = g_malloc0(sizeof(OSyncEngine));
00595         osync_group_set_data(group, engine);
00596         
00597         if (!g_thread_supported ())
00598                 g_thread_init (NULL);
00599         
00600         engine->context = g_main_context_new();
00601         engine->syncloop = g_main_loop_new(engine->context, FALSE);
00602         engine->group = group;
00603 
00604         OSyncUserInfo *user = osync_user_new(error);
00605         if (!user)
00606                 goto error;
00607 
00608         char *enginesdir = g_strdup_printf("%s/engines", osync_user_get_confdir(user));
00609         char *path = g_strdup_printf("%s/enginepipe", enginesdir);
00610 
00611         if (mkdir_with_parents(enginesdir, 0755) < 0) {
00612                 osync_error_set(error, OSYNC_ERROR_GENERIC, "Couldn't create engines directory: %s", strerror(errno));
00613                 goto error_free_paths;
00614         }
00615 
00616         engine->syncing_mutex = g_mutex_new();
00617         engine->info_received_mutex = g_mutex_new();
00618         engine->syncing = g_cond_new();
00619         engine->info_received = g_cond_new();
00620         engine->started_mutex = g_mutex_new();
00621         engine->started = g_cond_new();
00622                 
00623         //Set the default start flags
00624         engine->fl_running = osync_flag_new(NULL);
00625         osync_flag_set_pos_trigger(engine->fl_running, (OSyncFlagTriggerFunc)osengine_client_all_deciders, engine, NULL);
00626 
00627         engine->fl_sync = osync_flag_new(NULL);
00628         engine->fl_stop = osync_flag_new(NULL);
00629         osync_flag_set_pos_trigger(engine->fl_stop, (OSyncFlagTriggerFunc)osengine_client_all_deciders, engine, NULL);
00630         
00631         //The combined flags
00632         engine->cmb_sent_changes = osync_comb_flag_new(FALSE, FALSE);
00633         osync_flag_set_pos_trigger(engine->cmb_sent_changes, (OSyncFlagTriggerFunc)trigger_clients_sent_changes, engine, NULL);
00634         
00635         engine->cmb_read_all = osync_comb_flag_new(FALSE, TRUE);
00636         osync_flag_set_pos_trigger(engine->cmb_read_all, (OSyncFlagTriggerFunc)trigger_clients_read_all, engine, NULL);
00637         
00638         engine->cmb_entries_mapped = osync_comb_flag_new(FALSE, FALSE);
00639         osync_flag_set_pos_trigger(engine->cmb_entries_mapped, (OSyncFlagTriggerFunc)send_engine_changed, engine, NULL);
00640 
00641         
00642         engine->cmb_synced = osync_comb_flag_new(FALSE, TRUE);
00643         osync_flag_set_pos_trigger(engine->cmb_synced, (OSyncFlagTriggerFunc)send_engine_changed, engine, NULL);
00644 
00645         
00646         engine->cmb_finished = osync_comb_flag_new(FALSE, TRUE);
00647         osync_flag_set_pos_trigger(engine->cmb_finished, (OSyncFlagTriggerFunc)osengine_reset, engine, NULL);
00648         
00649         engine->cmb_connected = osync_comb_flag_new(FALSE, FALSE);
00650         osync_flag_set_pos_trigger(engine->cmb_connected, (OSyncFlagTriggerFunc)trigger_clients_connected, engine, NULL);
00651 
00652         engine->cmb_chkconflict = osync_comb_flag_new(FALSE, TRUE);
00653         osync_flag_set_pos_trigger(engine->cmb_chkconflict, (OSyncFlagTriggerFunc)trigger_status_end_conflicts, engine, NULL);
00654         
00655         engine->cmb_multiplied = osync_comb_flag_new(FALSE, TRUE);
00656         
00657         engine->cmb_committed_all = osync_comb_flag_new(FALSE, TRUE);
00658         osync_flag_set_pos_trigger(engine->cmb_committed_all, (OSyncFlagTriggerFunc)send_engine_changed, engine, NULL);
00659 
00660 
00661         engine->cmb_committed_all_sent = osync_comb_flag_new(FALSE, TRUE);
00662         osync_flag_set_pos_trigger(engine->cmb_committed_all_sent, (OSyncFlagTriggerFunc)trigger_clients_comitted_all, engine, NULL);
00663         
00664         osync_flag_set(engine->fl_sync);
00665         
00666         int i;
00667         for (i = 0; i < osync_group_num_members(group); i++) {
00668                 OSyncMember *member = osync_group_nth_member(group, i);
00669                 if (!osync_client_new(engine, member, error))
00670                         goto error_free_paths;
00671         }
00672         
00673         engine->maptable = osengine_mappingtable_new(engine);
00674         
00675         osync_trace(TRACE_EXIT, "osengine_new: %p", engine);
00676         return engine;
00677 
00678 error_free_paths:
00679         g_free(path);
00680         g_free(enginesdir);
00681 error:
00682         osync_trace(TRACE_EXIT_ERROR, "%s: %s", __func__, osync_error_print(error));
00683         return NULL;
00684 }
00685 
00693 void osengine_free(OSyncEngine *engine)
00694 {
00695         osync_trace(TRACE_ENTRY, "osengine_free(%p)", engine);
00696         
00697         GList *c = NULL;
00698         for (c = engine->clients; c; c = c->next) {
00699                 OSyncClient *client = c->data;
00700                 osync_client_free(client);
00701         }
00702         
00703         osengine_mappingtable_free(engine->maptable);
00704         engine->maptable = NULL;
00705         
00706         osync_flag_free(engine->fl_running);
00707         osync_flag_free(engine->fl_sync);
00708         osync_flag_free(engine->fl_stop);
00709         osync_flag_free(engine->cmb_sent_changes);
00710         osync_flag_free(engine->cmb_entries_mapped);
00711         osync_flag_free(engine->cmb_synced);
00712         osync_flag_free(engine->cmb_chkconflict);
00713         osync_flag_free(engine->cmb_finished);
00714         osync_flag_free(engine->cmb_connected);
00715         osync_flag_free(engine->cmb_read_all);
00716         osync_flag_free(engine->cmb_multiplied);
00717         osync_flag_free(engine->cmb_committed_all);
00718         osync_flag_free(engine->cmb_committed_all_sent);
00719         
00720         g_list_free(engine->clients);
00721         g_main_loop_unref(engine->syncloop);
00722         
00723         g_main_context_unref(engine->context);
00724         
00725         g_mutex_free(engine->syncing_mutex);
00726         g_mutex_free(engine->info_received_mutex);
00727         g_cond_free(engine->syncing);
00728         g_cond_free(engine->info_received);
00729         g_mutex_free(engine->started_mutex);
00730         g_cond_free(engine->started);
00731         
00732         g_free(engine);
00733         osync_trace(TRACE_EXIT, "osengine_free");
00734 }
00735 
00745 void osengine_set_conflict_callback(OSyncEngine *engine, void (* function) (OSyncEngine *, OSyncMapping *, void *), void *user_data)
00746 {
00747         engine->conflict_callback = function;
00748         engine->conflict_userdata = user_data;
00749 }
00750 
00760 void osengine_set_changestatus_callback(OSyncEngine *engine, void (* function) (OSyncEngine *, OSyncChangeUpdate *, void *), void *user_data)
00761 {
00762         engine->changestat_callback = function;
00763         engine->changestat_userdata = user_data;
00764 }
00765 
00775 void osengine_set_mappingstatus_callback(OSyncEngine *engine, void (* function) (OSyncMappingUpdate *, void *), void *user_data)
00776 {
00777         engine->mapstat_callback = function;
00778         engine->mapstat_userdata = user_data;
00779 }
00780 
00790 void osengine_set_enginestatus_callback(OSyncEngine *engine, void (* function) (OSyncEngine *, OSyncEngineUpdate *, void *), void *user_data)
00791 {
00792         engine->engstat_callback = function;
00793         engine->engstat_userdata = user_data;
00794 }
00795 
00805 void osengine_set_memberstatus_callback(OSyncEngine *engine, void (* function) (OSyncMemberUpdate *, void *), void *user_data)
00806 {
00807         engine->mebstat_callback = function;
00808         engine->mebstat_userdata = user_data;
00809 }
00810 
00820 void osengine_set_message_callback(OSyncEngine *engine, void *(* function) (OSyncEngine *, OSyncClient *, const char *, void *, void *), void *user_data)
00821 {
00822         engine->plgmsg_callback = function;
00823         engine->plgmsg_userdata = user_data;
00824 }
00825 
00837 osync_bool osengine_init(OSyncEngine *engine, OSyncError **error)
00838 {
00839         osync_trace(TRACE_ENTRY, "osengine_init(%p, %p)", engine, error);
00840         
00841         if (engine->is_initialized) {
00842                 osync_error_set(error, OSYNC_ERROR_MISCONFIGURATION, "This engine was already initialized");
00843                 osync_trace(TRACE_EXIT_ERROR, "osengine_init: %s", osync_error_print(error));
00844                 return FALSE;
00845         }
00846         
00847         switch (osync_group_lock(engine->group)) {
00848                 case OSYNC_LOCKED:
00849                         osync_error_set(error, OSYNC_ERROR_LOCKED, "Group is locked");
00850                         osync_trace(TRACE_EXIT_ERROR, "osengine_init: %s", osync_error_print(error));
00851                         return FALSE;
00852                 case OSYNC_LOCK_STALE:
00853                         osync_debug("ENG", 1, "Detected stale lock file. Slow-syncing");
00854                         osync_status_update_engine(engine, ENG_PREV_UNCLEAN, NULL);
00855                         osync_group_set_slow_sync(engine->group, "data", TRUE);
00856                         break;
00857                 default:
00858                         break;
00859         }
00860         
00861         osync_flag_set(engine->cmb_entries_mapped);
00862         osync_flag_set(engine->cmb_synced);
00863         engine->allow_sync_alert = TRUE;
00864         
00865         //OSyncMember *member = NULL;
00866         OSyncGroup *group = engine->group;
00867         
00868         if (osync_group_num_members(group) < 2) {
00869                 //Not enough members!
00870                 osync_error_set(error, OSYNC_ERROR_MISCONFIGURATION, "You only configured %i members, but at least 2 are needed", osync_group_num_members(group));
00871                 osync_group_unlock(engine->group, TRUE);
00872                 osync_trace(TRACE_EXIT_ERROR, "osengine_init: %s", osync_error_print(error));
00873                 return FALSE;
00874         }
00875         
00876         engine->is_initialized = TRUE;
00877         
00878         osync_trace(TRACE_INTERNAL, "Spawning clients");
00879         GList *c = NULL;
00880         for (c = engine->clients; c; c = c->next) {
00881                 OSyncClient *client = c->data;
00882                 osync_queue_create(client->commands_from_osplugin, NULL);
00883 
00884                 if (!osync_client_spawn(client, engine, error)) {
00885                         osync_group_unlock(engine->group, TRUE);
00886                         osync_trace(TRACE_EXIT_ERROR, "osengine_init: %s", osync_error_print(error));
00887                         return FALSE;
00888                 }
00889 
00890                 osync_queue_set_message_handler(client->commands_from_osplugin, (OSyncMessageHandler)engine_message_handler, engine);
00891                 if (!(engine->man_dispatch))
00892                         osync_queue_setup_with_gmainloop(client->commands_from_osplugin, engine->context);
00893                 osync_trace(TRACE_INTERNAL, "opening client queue");
00894                 if (!osync_queue_connect(client->commands_from_osplugin, OSYNC_QUEUE_RECEIVER, 0 )) {
00895                         osync_group_unlock(engine->group, TRUE);
00896                         osync_trace(TRACE_EXIT_ERROR, "osengine_init: %s", osync_error_print(error));
00897                         return FALSE;
00898                 }
00899         }
00900         
00901         osync_trace(TRACE_INTERNAL, "opening engine queue");
00902         if (!osync_queue_new_pipes(&engine->commands_from_self, &engine->commands_to_self, error)) {
00903                 osync_group_unlock(engine->group, TRUE);
00904                 osync_trace(TRACE_EXIT_ERROR, "osengine_init: %s", osync_error_print(error));
00905                 return FALSE;
00906         }
00907 
00908         if (!osync_queue_connect(engine->commands_from_self, OSYNC_QUEUE_RECEIVER, 0 )) {
00909                 osync_group_unlock(engine->group, TRUE);
00910                 osync_trace(TRACE_EXIT_ERROR, "osengine_init: %s", osync_error_print(error));
00911                 return FALSE;
00912         }
00913         
00914         if (!osync_queue_connect(engine->commands_to_self, OSYNC_QUEUE_SENDER, 0 )) {
00915                 osync_group_unlock(engine->group, TRUE);
00916                 osync_trace(TRACE_EXIT_ERROR, "osengine_init: %s", osync_error_print(error));
00917                 return FALSE;
00918         }
00919         
00920         osync_queue_set_message_handler(engine->commands_from_self, (OSyncMessageHandler)engine_message_handler, engine);
00921         if (!(engine->man_dispatch))
00922                 osync_queue_setup_with_gmainloop(engine->commands_from_self, engine->context);
00923         
00924         osync_trace(TRACE_INTERNAL, "initializing clients");
00925         for (c = engine->clients; c; c = c->next) {
00926                 OSyncClient *client = c->data;
00927                 if (!osync_client_init(client, engine, error)) {
00928                         osengine_finalize(engine);
00929                         osync_group_unlock(engine->group, TRUE);
00930                         osync_trace(TRACE_EXIT_ERROR, "osengine_init: %s", osync_error_print(error));
00931                         return FALSE;
00932                 }
00933         }
00934         
00935         osync_debug("ENG", 3, "Running the main loop");
00936 
00937         //Now we can run the main loop
00938         //We protect the startup by a g_cond
00939         g_mutex_lock(engine->started_mutex);
00940         GSource *idle = g_idle_source_new();
00941         g_source_set_priority(idle, G_PRIORITY_HIGH);
00942         g_source_set_callback(idle, startupfunc, engine, NULL);
00943     g_source_attach(idle, engine->context);
00944         engine->thread = g_thread_create ((GThreadFunc)g_main_loop_run, engine->syncloop, TRUE, NULL);
00945         g_cond_wait(engine->started, engine->started_mutex);
00946         g_mutex_unlock(engine->started_mutex);
00947         
00948         osync_trace(TRACE_EXIT, "osengine_init");
00949         return TRUE;
00950 }
00951 
00960 void osengine_finalize(OSyncEngine *engine)
00961 {
00962         //FIXME check if engine is running
00963         osync_trace(TRACE_ENTRY, "osengine_finalize(%p)", engine);
00964 
00965         if (!engine->is_initialized) {
00966                 osync_trace(TRACE_EXIT_ERROR, "osengine_finalize: Not initialized");
00967                 return;
00968         }
00969         
00970         g_assert(engine);
00971         osync_debug("ENG", 3, "finalizing engine %p", engine);
00972         
00973         if (engine->thread) {
00974                 g_main_loop_quit(engine->syncloop);
00975                 g_thread_join(engine->thread);
00976         }
00977         
00978         GList *c = NULL;
00979         for (c = engine->clients; c; c = c->next) {
00980                 OSyncClient *client = c->data;
00981                 osync_queue_disconnect(client->commands_from_osplugin, NULL);
00982                 osync_client_finalize(client, NULL);
00983         }
00984 
00985         osync_queue_disconnect(engine->commands_from_self, NULL);
00986         osync_queue_disconnect(engine->commands_to_self, NULL);
00987 
00988         osync_queue_free(engine->commands_from_self);
00989         engine->commands_from_self = NULL;
00990         osync_queue_free(engine->commands_to_self);
00991         engine->commands_to_self = NULL;
00992         
00993         osengine_mappingtable_close(engine->maptable);
00994         
00995         if (engine->error) {
00996                 /* If the error occured during connect, we
00997                  * dont want to trigger a slow-sync the next
00998                  * time. In the case the we have a slow-sync
00999                  * right in the beginning, we also dont remove
01000                  * the lockfile to trigger a slow-sync again
01001                  * next time */
01002                 if (!osync_flag_is_set(engine->cmb_connected) && !engine->slowsync)
01003                         osync_group_unlock(engine->group, TRUE);
01004                 else
01005                         osync_group_unlock(engine->group, FALSE);
01006         } else
01007                 osync_group_unlock(engine->group, TRUE);
01008         
01009         engine->is_initialized = FALSE;
01010         osync_trace(TRACE_EXIT, "osengine_finalize");
01011 }
01012 
01023 osync_bool osengine_synchronize(OSyncEngine *engine, OSyncError **error)
01024 {
01025         osync_trace(TRACE_INTERNAL, "synchronize now");
01026         osync_trace(TRACE_ENTRY, "%s(%p)", __func__, engine);
01027         g_assert(engine);
01028         
01029         if (!engine->is_initialized) {
01030                 osync_error_set(error, OSYNC_ERROR_GENERIC, "osengine_synchronize: Not initialized");
01031                 goto error;
01032         }
01033         
01034         /* We now remember if slow-sync is set right from the start.
01035          * If it is, we dont remove the lock file in the case of
01036          * a error during connect. */
01037         if (osync_group_get_slow_sync(engine->group, "data")) {
01038                 engine->slowsync = TRUE;
01039         } else {
01040                 engine->slowsync = FALSE;
01041         }
01042         
01043         engine->wasted = 0;
01044         engine->alldeciders = 0;
01045         
01046         osync_flag_set(engine->fl_running);
01047         
01048         OSyncMessage *message = osync_message_new(OSYNC_MESSAGE_SYNCHRONIZE, 0, error);
01049         if (!message)
01050                 goto error;
01051         
01052         if (!osync_queue_send_message(engine->commands_to_self, NULL, message, error))
01053                 goto error_free_message;
01054         
01055         osync_message_unref(message);
01056         
01057         osync_trace(TRACE_EXIT, "%s", __func__);
01058         return TRUE;
01059 
01060 error_free_message:
01061         osync_message_unref(message);
01062 error:
01063         osync_trace(TRACE_EXIT_ERROR, "%s: %s", __func__, osync_error_print(error));
01064         return FALSE;
01065 }
01066 
01074 void osengine_flag_only_info(OSyncEngine *engine)
01075 {
01076         osync_flag_unset(engine->fl_sync);
01077 }
01078 
01086 void osengine_flag_manual(OSyncEngine *engine)
01087 {
01088         if (engine->syncloop) {
01089                 g_warning("Unable to flag manual since engine is already initialized\n");
01090         }
01091         engine->man_dispatch = TRUE;
01092 }
01093 
01100 void osengine_pause(OSyncEngine *engine)
01101 {
01102         osync_flag_unset(engine->fl_running);
01103 }
01104 
01112 void osengine_abort(OSyncEngine *engine)
01113 {
01114         osync_flag_set(engine->fl_stop);
01115 }
01116 
01123 void osengine_allow_sync_alert(OSyncEngine *engine)
01124 {
01125         engine->allow_sync_alert = TRUE;
01126 }
01127 
01134 void osengine_deny_sync_alert(OSyncEngine *engine)
01135 {
01136         engine->allow_sync_alert = FALSE;
01137 }
01138 
01149 osync_bool osengine_sync_and_block(OSyncEngine *engine, OSyncError **error)
01150 {
01151         osync_trace(TRACE_ENTRY, "%s(%p, %p)", __func__, engine, error);
01152         
01153         g_mutex_lock(engine->syncing_mutex);
01154         
01155         if (!osengine_synchronize(engine, error)) {
01156                 g_mutex_unlock(engine->syncing_mutex);
01157                 goto error;
01158         }
01159         
01160         g_cond_wait(engine->syncing, engine->syncing_mutex);
01161         g_mutex_unlock(engine->syncing_mutex);
01162         
01163         if (engine->error) {
01164                 osync_error_duplicate(error, &(engine->error));
01165                 goto error;
01166         }
01167         
01168         osync_trace(TRACE_EXIT, "%s", __func__);
01169         return TRUE;
01170 
01171 error:
01172         osync_trace(TRACE_EXIT_ERROR, "%s: %s", __func__, osync_error_print(error));
01173         return FALSE;
01174 }
01175 
01186 osync_bool osengine_wait_sync_end(OSyncEngine *engine, OSyncError **error)
01187 {
01188         g_mutex_lock(engine->syncing_mutex);
01189         g_cond_wait(engine->syncing, engine->syncing_mutex);
01190         g_mutex_unlock(engine->syncing_mutex);
01191         
01192         if (engine->error) {
01193                 osync_error_duplicate(error, &(engine->error));
01194                 return FALSE;
01195         }
01196         return TRUE;
01197 }
01198 
01205 void osengine_wait_info_end(OSyncEngine *engine)
01206 {
01207         g_mutex_lock(engine->info_received_mutex);
01208         g_cond_wait(engine->info_received, engine->info_received_mutex);
01209         g_mutex_unlock(engine->info_received_mutex);
01210 }
01211 
01216 void osengine_one_iteration(OSyncEngine *engine)
01217 {
01218         /*TODO: Reimplement support to stepping mode on engine */
01219         abort();//osync_queue_dispatch(engine->incoming);
01220 }
01221 
01228 OSyncMapping *osengine_mapping_from_id(OSyncEngine *engine, long long int id)
01229 {
01230         return osengine_mappingtable_mapping_from_id(engine->maptable, id);
01231 }
01232 

Generated on Mon Jul 26 2010 for OpenSync by  doxygen 1.7.1