001/* 002 * HA-JDBC: High-Availability JDBC 003 * Copyright (c) 2004-2007 Paul Ferraro 004 * 005 * This library is free software; you can redistribute it and/or modify it 006 * under the terms of the GNU Lesser General Public License as published by the 007 * Free Software Foundation; either version 2.1 of the License, or (at your 008 * option) any later version. 009 * 010 * This library is distributed in the hope that it will be useful, but WITHOUT 011 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 012 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License 013 * for more details. 014 * 015 * You should have received a copy of the GNU Lesser General Public License 016 * along with this library; if not, write to the Free Software Foundation, 017 * Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 018 * 019 * Contact: ferraro@users.sourceforge.net 020 */ 021package net.sf.hajdbc.sql; 022 023import java.lang.reflect.Method; 024import java.sql.Connection; 025import java.sql.ResultSet; 026import java.sql.SQLException; 027import java.sql.Statement; 028import java.util.ArrayList; 029import java.util.Collections; 030import java.util.LinkedList; 031import java.util.List; 032import java.util.Map; 033import java.util.Set; 034import java.util.SortedMap; 035import java.util.TreeSet; 036import java.util.concurrent.locks.Lock; 037 038import net.sf.hajdbc.Database; 039import net.sf.hajdbc.DatabaseProperties; 040import net.sf.hajdbc.LockManager; 041import net.sf.hajdbc.Messages; 042import net.sf.hajdbc.TableProperties; 043import net.sf.hajdbc.util.SQLExceptionFactory; 044import net.sf.hajdbc.util.reflect.Methods; 045 046/** 047 * @author Paul Ferraro 048 * @param <D> 049 * @param <S> 050 */ 051@SuppressWarnings("nls") 052public abstract class AbstractStatementInvocationHandler<D, S extends Statement> extends AbstractChildInvocationHandler<D, Connection, S> 053{ 054 private static final Set<Method> driverReadMethodSet = Methods.findMethods(Statement.class, "getFetchDirection", "getFetchSize", "getGeneratedKeys", "getMaxFieldSize", "getMaxRows", "getQueryTimeout", "getResultSetConcurrency", "getResultSetHoldability", "getResultSetType", "getUpdateCount", "getWarnings", "isClosed", "isPoolable"); 055 private static final Set<Method> driverWriteMethodSet = Methods.findMethods(Statement.class, "clearWarnings", "setCursorName", "setEscapeProcessing", "setFetchDirection", "setFetchSize", "setMaxFieldSize", "setMaxRows", "setPoolable", "setQueryTimeout"); 056 private static final Set<Method> executeMethodSet = Methods.findMethods(Statement.class, "execute(Update)?"); 057 058 private static final Method getConnectionMethod = Methods.getMethod(Statement.class, "getConnection"); 059 private static final Method executeQueryMethod = Methods.getMethod(Statement.class, "executeQuery", String.class); 060 private static final Method clearBatchMethod = Methods.getMethod(Statement.class, "clearBatch"); 061 private static final Method executeBatchMethod = Methods.getMethod(Statement.class, "executeBatch"); 062 private static final Method getMoreResultsMethod = Methods.getMethod(Statement.class, "getMoreResults", Integer.TYPE); 063 private static final Method getResultSetMethod = Methods.getMethod(Statement.class, "getResultSet"); 064 private static final Method addBatchMethod = Methods.getMethod(Statement.class, "addBatch", String.class); 065 private static final Method closeMethod = Methods.getMethod(Statement.class, "close"); 066 067 protected TransactionContext<D> transactionContext; 068 protected FileSupport fileSupport; 069 070 private List<Invoker<D, S, ?>> invokerList = new LinkedList<Invoker<D, S, ?>>(); 071 private List<String> sqlList = new LinkedList<String>(); 072 073 /** 074 * @param connection the parent connection of this statement 075 * @param proxy the parent invocation handler 076 * @param invoker the invoker that created this statement 077 * @param statementClass 078 * @param statementMap a map of database to underlying statement 079 * @param transactionContext 080 * @param fileSupport support object for streams 081 * @throws Exception 082 */ 083 protected AbstractStatementInvocationHandler(Connection connection, SQLProxy<D, Connection> proxy, Invoker<D, Connection, S> invoker, Class<S> statementClass, Map<Database<D>, S> statementMap, TransactionContext<D> transactionContext, FileSupport fileSupport) throws Exception 084 { 085 super(connection, proxy, invoker, statementClass, statementMap); 086 087 this.transactionContext = transactionContext; 088 this.fileSupport = fileSupport; 089 } 090 091 /** 092 * @see net.sf.hajdbc.sql.AbstractChildInvocationHandler#getInvocationStrategy(java.lang.Object, java.lang.reflect.Method, java.lang.Object[]) 093 */ 094 @Override 095 protected InvocationStrategy<D, S, ?> getInvocationStrategy(S statement, Method method, Object[] parameters) throws Exception 096 { 097 if (driverReadMethodSet.contains(method)) 098 { 099 return new DriverReadInvocationStrategy<D, S, Object>(); 100 } 101 102 if (driverWriteMethodSet.contains(method) || method.equals(closeMethod) || method.equals(addBatchMethod) || method.equals(clearBatchMethod)) 103 { 104 return new DriverWriteInvocationStrategy<D, S, Object>(); 105 } 106 107 if (executeMethodSet.contains(method)) 108 { 109 List<Lock> lockList = this.extractLocks((String) parameters[0]); 110 111 return this.transactionContext.start(new LockingInvocationStrategy<D, S, Object>(new DatabaseWriteInvocationStrategy<D, S, Object>(this.cluster.getTransactionalExecutor()), lockList), this.getParent()); 112 } 113 114 if (method.equals(getConnectionMethod)) 115 { 116 return new InvocationStrategy<D, S, Connection>() 117 { 118 public Connection invoke(SQLProxy<D, S> proxy, Invoker<D, S, Connection> invoker) throws Exception 119 { 120 return AbstractStatementInvocationHandler.this.getParent(); 121 } 122 }; 123 } 124 125 if (method.equals(executeQueryMethod)) 126 { 127 String sql = (String) parameters[0]; 128 129 List<Lock> lockList = this.extractLocks(sql); 130 131 int concurrency = statement.getResultSetConcurrency(); 132 boolean selectForUpdate = this.isSelectForUpdate(sql); 133 134 if (lockList.isEmpty() && (concurrency == ResultSet.CONCUR_READ_ONLY) && !selectForUpdate) 135 { 136 return new LazyResultSetInvocationStrategy<D, S>(statement, this.transactionContext, this.fileSupport); 137 } 138 139 InvocationStrategy<D, S, ResultSet> strategy = new LockingInvocationStrategy<D, S, ResultSet>(new EagerResultSetInvocationStrategy<D, S>(this.cluster, statement, this.transactionContext, this.fileSupport), lockList); 140 141 return selectForUpdate ? this.transactionContext.start(strategy, this.getParent()) : strategy; 142 } 143 144 if (method.equals(executeBatchMethod)) 145 { 146 List<Lock> lockList = this.extractLocks(this.sqlList); 147 148 return this.transactionContext.start(new LockingInvocationStrategy<D, S, Object>(new DatabaseWriteInvocationStrategy<D, S, Object>(this.cluster.getTransactionalExecutor()), lockList), this.getParent()); 149 } 150 151 if (method.equals(getMoreResultsMethod)) 152 { 153 if (parameters[0].equals(Statement.KEEP_CURRENT_RESULT)) 154 { 155 return new DriverWriteInvocationStrategy<D, S, Object>(); 156 } 157 } 158 159 if (method.equals(getResultSetMethod)) 160 { 161 if (statement.getResultSetConcurrency() == ResultSet.CONCUR_READ_ONLY) 162 { 163 return new LazyResultSetInvocationStrategy<D, S>(statement, this.transactionContext, this.fileSupport); 164 } 165 166 return new EagerResultSetInvocationStrategy<D, S>(this.cluster, statement, this.transactionContext, this.fileSupport); 167 } 168 169 return super.getInvocationStrategy(statement, method, parameters); 170 } 171 172 /** 173 * @see net.sf.hajdbc.sql.AbstractChildInvocationHandler#isSQLMethod(java.lang.reflect.Method) 174 */ 175 @Override 176 protected boolean isSQLMethod(Method method) 177 { 178 return method.equals(addBatchMethod) || method.equals(executeQueryMethod) || executeMethodSet.contains(method); 179 } 180 181 /** 182 * @see net.sf.hajdbc.sql.AbstractChildInvocationHandler#postInvoke(java.lang.Object, java.lang.reflect.Method, java.lang.Object[]) 183 */ 184 @Override 185 protected void postInvoke(S statement, Method method, Object[] parameters) 186 { 187 if (method.equals(addBatchMethod)) 188 { 189 this.sqlList.add((String) parameters[0]); 190 } 191 else if (method.equals(closeMethod)) 192 { 193 this.fileSupport.close(); 194 195 this.getParentProxy().removeChild(this); 196 } 197 else if (method.equals(clearBatchMethod) || method.equals(executeBatchMethod)) 198 { 199 this.sqlList.clear(); 200 } 201 } 202 203 /** 204 * @see net.sf.hajdbc.sql.SQLProxy#handlePartialFailure(java.util.SortedMap, java.util.SortedMap) 205 */ 206 @Override 207 public <R> SortedMap<Database<D>, R> handlePartialFailure(SortedMap<Database<D>, R> resultMap, SortedMap<Database<D>, Exception> exceptionMap) throws Exception 208 { 209 if (this.getParent().getAutoCommit()) 210 { 211 return super.handlePartialFailure(resultMap, exceptionMap); 212 } 213 214 // If auto-commit is off, throw exception to give client the opportunity to rollback the transaction 215 Map<Boolean, List<Database<D>>> aliveMap = this.cluster.getAliveMap(exceptionMap.keySet()); 216 217 List<Database<D>> aliveList = aliveMap.get(true); 218 219 int size = aliveList.size(); 220 221 // Assume successful databases are alive 222 aliveList.addAll(resultMap.keySet()); 223 224 this.detectClusterPanic(aliveMap); 225 226 List<Database<D>> deadList = aliveMap.get(false); 227 228 for (Database<D> database: deadList) 229 { 230 if (this.cluster.deactivate(database, this.cluster.getStateManager())) 231 { 232 this.logger.error(Messages.getMessage(Messages.DATABASE_DEACTIVATED, database, this.cluster), exceptionMap.get(database)); 233 } 234 } 235 236 // If failed databases are all dead 237 if (size == 0) 238 { 239 return resultMap; 240 } 241 242 // Chain exceptions from alive databases 243 SQLException exception = SQLExceptionFactory.createSQLException(exceptionMap.get(aliveList.get(0))); 244 245 for (Database<D> database: aliveList.subList(1, size)) 246 { 247 exception.setNextException(SQLExceptionFactory.createSQLException(exceptionMap.get(database))); 248 } 249 250 throw exception; 251 } 252 253 protected boolean isSelectForUpdate(String sql) throws SQLException 254 { 255 return this.getDatabaseProperties().supportsSelectForUpdate() ? this.cluster.getDialect().isSelectForUpdate(sql) : false; 256 } 257 258 protected List<Lock> extractLocks(String sql) throws SQLException 259 { 260 return this.extractLocks(Collections.singletonList(sql)); 261 } 262 263 private List<Lock> extractLocks(List<String> sqlList) throws SQLException 264 { 265 Set<String> identifierSet = new TreeSet<String>(); 266 267 for (String sql: sqlList) 268 { 269 if (this.cluster.isSequenceDetectionEnabled()) 270 { 271 String sequence = this.cluster.getDialect().parseSequence(sql); 272 273 if (sequence != null) 274 { 275 identifierSet.add(sequence); 276 } 277 } 278 279 if (this.cluster.isIdentityColumnDetectionEnabled()) 280 { 281 String table = this.cluster.getDialect().parseInsertTable(sql); 282 283 if (table != null) 284 { 285 TableProperties tableProperties = this.getDatabaseProperties().findTable(table); 286 287 if (!tableProperties.getIdentityColumns().isEmpty()) 288 { 289 identifierSet.add(tableProperties.getName()); 290 } 291 } 292 } 293 } 294 295 List<Lock> lockList = new ArrayList<Lock>(identifierSet.size()); 296 297 if (!identifierSet.isEmpty()) 298 { 299 LockManager lockManager = this.cluster.getLockManager(); 300 301 for (String identifier: identifierSet) 302 { 303 lockList.add(lockManager.writeLock(identifier)); 304 } 305 } 306 307 return lockList; 308 } 309 310 protected DatabaseProperties getDatabaseProperties() throws SQLException 311 { 312 return this.cluster.getDatabaseMetaDataCache().getDatabaseProperties(this.getParent()); 313 } 314 315 /** 316 * @see net.sf.hajdbc.sql.AbstractChildInvocationHandler#close(java.lang.Object, java.lang.Object) 317 */ 318 @Override 319 protected void close(Connection connection, S statement) throws SQLException 320 { 321 statement.close(); 322 } 323 324 /** 325 * @see net.sf.hajdbc.sql.AbstractInvocationHandler#record(net.sf.hajdbc.sql.Invoker, java.lang.reflect.Method, java.lang.Object[]) 326 */ 327 @Override 328 protected void record(Invoker<D, S, ?> invoker, Method method, Object[] parameters) 329 { 330 if (this.isBatchMethod(method)) 331 { 332 synchronized (this.invokerList) 333 { 334 this.invokerList.add(invoker); 335 } 336 } 337 else if (this.isEndBatchMethod(method)) 338 { 339 synchronized (this.invokerList) 340 { 341 this.invokerList.clear(); 342 } 343 } 344 else 345 { 346 super.record(invoker, method, parameters); 347 } 348 } 349 350 /** 351 * @see net.sf.hajdbc.sql.AbstractInvocationHandler#isRecordable(java.lang.reflect.Method) 352 */ 353 @Override 354 protected boolean isRecordable(Method method) 355 { 356 return driverWriteMethodSet.contains(method); 357 } 358 359 protected boolean isBatchMethod(Method method) 360 { 361 return method.equals(addBatchMethod); 362 } 363 364 protected boolean isEndBatchMethod(Method method) 365 { 366 return method.equals(clearBatchMethod) || method.equals(executeBatchMethod); 367 } 368 369 /** 370 * @see net.sf.hajdbc.sql.AbstractInvocationHandler#replay(net.sf.hajdbc.Database, java.lang.Object) 371 */ 372 @Override 373 protected void replay(Database<D> database, S statement) throws Exception 374 { 375 super.replay(database, statement); 376 377 synchronized (this.invokerList) 378 { 379 for (Invoker<D, S, ?> invoker: this.invokerList) 380 { 381 invoker.invoke(database, statement); 382 } 383 } 384 } 385}