001/* 002 * HA-JDBC: High-Availability JDBC 003 * Copyright (c) 2004-2007 Paul Ferraro 004 * 005 * This library is free software; you can redistribute it and/or modify it 006 * under the terms of the GNU Lesser General Public License as published by the 007 * Free Software Foundation; either version 2.1 of the License, or (at your 008 * option) any later version. 009 * 010 * This library is distributed in the hope that it will be useful, but WITHOUT 011 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 012 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License 013 * for more details. 014 * 015 * You should have received a copy of the GNU Lesser General Public License 016 * along with this library; if not, write to the Free Software Foundation, 017 * Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 018 * 019 * Contact: ferraro@users.sourceforge.net 020 */ 021package net.sf.hajdbc.sync; 022 023import java.sql.Connection; 024import java.sql.SQLException; 025import java.util.HashMap; 026import java.util.Map; 027import java.util.Set; 028import java.util.concurrent.ExecutorService; 029import java.util.concurrent.Executors; 030 031import net.sf.hajdbc.Balancer; 032import net.sf.hajdbc.Database; 033import net.sf.hajdbc.DatabaseCluster; 034import net.sf.hajdbc.DatabaseMetaDataCache; 035import net.sf.hajdbc.DatabaseProperties; 036import net.sf.hajdbc.Dialect; 037import net.sf.hajdbc.SynchronizationContext; 038import net.sf.hajdbc.util.concurrent.DaemonThreadFactory; 039 040import org.slf4j.Logger; 041import org.slf4j.LoggerFactory; 042 043/** 044 * @author Paul Ferraro 045 * @param <D> Driver or DataSource 046 */ 047public class SynchronizationContextImpl<D> implements SynchronizationContext<D> 048{ 049 private static Logger logger = LoggerFactory.getLogger(SynchronizationContextImpl.class); 050 051 private Set<Database<D>> activeDatabaseSet; 052 private Database<D> sourceDatabase; 053 private Database<D> targetDatabase; 054 private DatabaseCluster<D> cluster; 055 private DatabaseProperties sourceDatabaseProperties; 056 private DatabaseProperties targetDatabaseProperties; 057 private Map<Database<D>, Connection> connectionMap = new HashMap<Database<D>, Connection>(); 058 private ExecutorService executor; 059 060 /** 061 * @param cluster 062 * @param database 063 * @throws SQLException 064 */ 065 public SynchronizationContextImpl(DatabaseCluster<D> cluster, Database<D> database) throws SQLException 066 { 067 this.cluster = cluster; 068 069 Balancer<D> balancer = cluster.getBalancer(); 070 071 this.sourceDatabase = balancer.next(); 072 this.activeDatabaseSet = balancer.all(); 073 this.targetDatabase = database; 074 this.executor = Executors.newFixedThreadPool(this.activeDatabaseSet.size(), DaemonThreadFactory.getInstance()); 075 076 DatabaseMetaDataCache cache = cluster.getDatabaseMetaDataCache(); 077 078 this.targetDatabaseProperties = cache.getDatabaseProperties(this.getConnection(this.targetDatabase)); 079 this.sourceDatabaseProperties = cache.getDatabaseProperties(this.getConnection(this.sourceDatabase)); 080 } 081 082 /** 083 * @see net.sf.hajdbc.SynchronizationContext#getConnection(net.sf.hajdbc.Database) 084 */ 085 @Override 086 public Connection getConnection(Database<D> database) throws SQLException 087 { 088 synchronized (this.connectionMap) 089 { 090 Connection connection = this.connectionMap.get(database); 091 092 if (connection == null) 093 { 094 connection = database.connect(database.createConnectionFactory()); 095 096 this.connectionMap.put(database, connection); 097 } 098 099 return connection; 100 } 101 } 102 103 /** 104 * @see net.sf.hajdbc.SynchronizationContext#getSourceDatabase() 105 */ 106 @Override 107 public Database<D> getSourceDatabase() 108 { 109 return this.sourceDatabase; 110 } 111 112 /** 113 * @see net.sf.hajdbc.SynchronizationContext#getTargetDatabase() 114 */ 115 @Override 116 public Database<D> getTargetDatabase() 117 { 118 return this.targetDatabase; 119 } 120 121 /** 122 * @see net.sf.hajdbc.SynchronizationContext#getActiveDatabaseSet() 123 */ 124 @Override 125 public Set<Database<D>> getActiveDatabaseSet() 126 { 127 return this.activeDatabaseSet; 128 } 129 130 /** 131 * @see net.sf.hajdbc.SynchronizationContext#getSourceDatabaseProperties() 132 */ 133 @Override 134 public DatabaseProperties getSourceDatabaseProperties() 135 { 136 return this.sourceDatabaseProperties; 137 } 138 139 /** 140 * @see net.sf.hajdbc.SynchronizationContext#getTargetDatabaseProperties() 141 */ 142 @Override 143 public DatabaseProperties getTargetDatabaseProperties() 144 { 145 return this.targetDatabaseProperties; 146 } 147 148 /** 149 * @see net.sf.hajdbc.SynchronizationContext#getDialect() 150 */ 151 @Override 152 public Dialect getDialect() 153 { 154 return this.cluster.getDialect(); 155 } 156 157 /** 158 * @see net.sf.hajdbc.SynchronizationContext#getExecutor() 159 */ 160 @Override 161 public ExecutorService getExecutor() 162 { 163 return this.executor; 164 } 165 166 /** 167 * @see net.sf.hajdbc.SynchronizationContext#close() 168 */ 169 @Override 170 public void close() 171 { 172 synchronized (this.connectionMap) 173 { 174 for (Connection connection: this.connectionMap.values()) 175 { 176 if (connection != null) 177 { 178 try 179 { 180 if (!connection.isClosed()) 181 { 182 connection.close(); 183 } 184 } 185 catch (SQLException e) 186 { 187 logger.warn(e.toString(), e); 188 } 189 } 190 } 191 } 192 193 this.executor.shutdown(); 194 } 195}