001/*
002 * HA-JDBC: High-Availability JDBC
003 * Copyright (c) 2004-2007 Paul Ferraro
004 * 
005 * This library is free software; you can redistribute it and/or modify it 
006 * under the terms of the GNU Lesser General Public License as published by the 
007 * Free Software Foundation; either version 2.1 of the License, or (at your 
008 * option) any later version.
009 * 
010 * This library is distributed in the hope that it will be useful, but WITHOUT
011 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 
012 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License 
013 * for more details.
014 * 
015 * You should have received a copy of the GNU Lesser General Public License
016 * along with this library; if not, write to the Free Software Foundation, 
017 * Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
018 * 
019 * Contact: ferraro@users.sourceforge.net
020 */
021package net.sf.hajdbc.sync;
022
023import java.sql.Connection;
024import java.sql.SQLException;
025import java.util.HashMap;
026import java.util.Map;
027import java.util.Set;
028import java.util.concurrent.ExecutorService;
029import java.util.concurrent.Executors;
030
031import net.sf.hajdbc.Balancer;
032import net.sf.hajdbc.Database;
033import net.sf.hajdbc.DatabaseCluster;
034import net.sf.hajdbc.DatabaseMetaDataCache;
035import net.sf.hajdbc.DatabaseProperties;
036import net.sf.hajdbc.Dialect;
037import net.sf.hajdbc.SynchronizationContext;
038import net.sf.hajdbc.util.concurrent.DaemonThreadFactory;
039
040import org.slf4j.Logger;
041import org.slf4j.LoggerFactory;
042
043/**
044 * @author Paul Ferraro
045 * @param <D> Driver or DataSource
046 */
047public class SynchronizationContextImpl<D> implements SynchronizationContext<D>
048{
049        private static Logger logger = LoggerFactory.getLogger(SynchronizationContextImpl.class);
050        
051        private Set<Database<D>> activeDatabaseSet;
052        private Database<D> sourceDatabase;
053        private Database<D> targetDatabase;
054        private DatabaseCluster<D> cluster;
055        private DatabaseProperties sourceDatabaseProperties;
056        private DatabaseProperties targetDatabaseProperties;
057        private Map<Database<D>, Connection> connectionMap = new HashMap<Database<D>, Connection>();
058        private ExecutorService executor;
059        
060        /**
061         * @param cluster
062         * @param database
063         * @throws SQLException
064         */
065        public SynchronizationContextImpl(DatabaseCluster<D> cluster, Database<D> database) throws SQLException
066        {
067                this.cluster = cluster;
068                
069                Balancer<D> balancer = cluster.getBalancer();
070                
071                this.sourceDatabase = balancer.next();
072                this.activeDatabaseSet = balancer.all();
073                this.targetDatabase = database;
074                this.executor = Executors.newFixedThreadPool(this.activeDatabaseSet.size(), DaemonThreadFactory.getInstance());
075                
076                DatabaseMetaDataCache cache = cluster.getDatabaseMetaDataCache();
077                
078                this.targetDatabaseProperties = cache.getDatabaseProperties(this.getConnection(this.targetDatabase));
079                this.sourceDatabaseProperties = cache.getDatabaseProperties(this.getConnection(this.sourceDatabase));
080        }
081        
082        /**
083         * @see net.sf.hajdbc.SynchronizationContext#getConnection(net.sf.hajdbc.Database)
084         */
085        @Override
086        public Connection getConnection(Database<D> database) throws SQLException
087        {
088                synchronized (this.connectionMap)
089                {
090                        Connection connection = this.connectionMap.get(database);
091                        
092                        if (connection == null)
093                        {
094                                connection = database.connect(database.createConnectionFactory());
095                                
096                                this.connectionMap.put(database, connection);
097                        }
098                        
099                        return connection;
100                }
101        }
102        
103        /**
104         * @see net.sf.hajdbc.SynchronizationContext#getSourceDatabase()
105         */
106        @Override
107        public Database<D> getSourceDatabase()
108        {
109                return this.sourceDatabase;
110        }
111        
112        /**
113         * @see net.sf.hajdbc.SynchronizationContext#getTargetDatabase()
114         */
115        @Override
116        public Database<D> getTargetDatabase()
117        {
118                return this.targetDatabase;
119        }
120        
121        /**
122         * @see net.sf.hajdbc.SynchronizationContext#getActiveDatabaseSet()
123         */
124        @Override
125        public Set<Database<D>> getActiveDatabaseSet()
126        {
127                return this.activeDatabaseSet;
128        }
129        
130        /**
131         * @see net.sf.hajdbc.SynchronizationContext#getSourceDatabaseProperties()
132         */
133        @Override
134        public DatabaseProperties getSourceDatabaseProperties()
135        {
136                return this.sourceDatabaseProperties;
137        }
138
139        /**
140         * @see net.sf.hajdbc.SynchronizationContext#getTargetDatabaseProperties()
141         */
142        @Override
143        public DatabaseProperties getTargetDatabaseProperties()
144        {
145                return this.targetDatabaseProperties;
146        }
147
148        /**
149         * @see net.sf.hajdbc.SynchronizationContext#getDialect()
150         */
151        @Override
152        public Dialect getDialect()
153        {
154                return this.cluster.getDialect();
155        }
156        
157        /**
158         * @see net.sf.hajdbc.SynchronizationContext#getExecutor()
159         */
160        @Override
161        public ExecutorService getExecutor()
162        {
163                return this.executor;
164        }
165
166        /**
167         * @see net.sf.hajdbc.SynchronizationContext#close()
168         */
169        @Override
170        public void close()
171        {
172                synchronized (this.connectionMap)
173                {
174                        for (Connection connection: this.connectionMap.values())
175                        {
176                                if (connection != null)
177                                {
178                                        try
179                                        {
180                                                if (!connection.isClosed())
181                                                {
182                                                        connection.close();
183                                                }
184                                        }
185                                        catch (SQLException e)
186                                        {
187                                                logger.warn(e.toString(), e);
188                                        }
189                                }
190                        }
191                }
192                
193                this.executor.shutdown();
194        }
195}