Mysql JDBC驱动源码分析(Statement,ResultSet的创建)四
一,当连接创建完成时,接着就创建Statement进行sql查询,并返回相应的ResultSet
进入ConnectionImpl类下的,createStatement(..)
?
?
public java.sql.Statement createStatement(int resultSetType,int resultSetConcurrency, int resultSetHoldability)throws SQLException { //对返回的结果集进行指定相应的模式功能,可参照ResultSet的常量设置if (getPedantic()) {if (resultSetHoldability != java.sql.ResultSet.HOLD_CURSORS_OVER_COMMIT) {throw SQLError.createSQLException("HOLD_CUSRORS_OVER_COMMIT is only supported holdability level",SQLError.SQL_STATE_ILLEGAL_ARGUMENT, getExceptionInterceptor());}}return createStatement(resultSetType, resultSetConcurrency);}public java.sql.Statement createStatement(int resultSetType,int resultSetConcurrency) throws SQLException {checkClosed(); //getLoadBalanceSafeProxy() 为相应的连接StatementImpl stmt = new StatementImpl(getLoadBalanceSafeProxy(), this.database);stmt.setResultSetType(resultSetType);stmt.setResultSetConcurrency(resultSetConcurrency);return stmt;}?
进入StatementImpl .executeQuery(..)
?
public synchronized java.sql.ResultSet executeQuery(String sql)throws SQLException {checkClosed();//参数所传的连接对象MySQLConnection locallyScopedConn = this.connection;synchronized (locallyScopedConn) {this.retrieveGeneratedKeys = false;resetCancelledState();checkNullOrEmptyQuery(sql);//We only stream result sets when they are forward-only, read-only, and thefetch size has been set to Integer.MIN_VALUEboolean doStreaming = createStreamingResultSet();if (doStreaming&& this.connection.getNetTimeoutForStreamingResults() > 0) {executeSimpleNonQuery(locallyScopedConn, "SET net_write_timeout="+ this.connection.getNetTimeoutForStreamingResults());}if (this.doEscapeProcessing) {Object escapedSqlResult = EscapeProcessor.escapeSQL(sql,locallyScopedConn.serverSupportsConvertFn(), this.connection);if (escapedSqlResult instanceof String) {sql = (String) escapedSqlResult;} else {sql = ((EscapeProcessorResult) escapedSqlResult).escapedSql;}}char firstStatementChar = StringUtils.firstNonWsCharUc(sql,findStartOfStatement(sql));if (sql.charAt(0) == '/') {if (sql.startsWith(PING_MARKER)) {doPingInstead();return this.results;}}checkForDml(sql, firstStatementChar);if (this.results != null) {if (!locallyScopedConn.getHoldResultsOpenOverStatementClose()) {this.results.realClose(false);}}CachedResultSetMetaData cachedMetaData = null;// If there isn't a limit clause in the SQL// then limit the number of rows to return in// an efficient manner. Only do this if// setMaxRows() hasn't been used on any Statements// generated from the current Connection (saves// a query, and network traffic).if (useServerFetch()) {this.results = createResultSetUsingServerFetch(sql);return this.results;}CancelTask timeoutTask = null;String oldCatalog = null;try {if (locallyScopedConn.getEnableQueryTimeouts() &&this.timeoutInMillis != 0&& locallyScopedConn.versionMeetsMinimum(5, 0, 0)) {timeoutTask = new CancelTask(this);locallyScopedConn.getCancelTimer().schedule(timeoutTask,this.timeoutInMillis);}if (!locallyScopedConn.getCatalog().equals(this.currentCatalog)) {oldCatalog = locallyScopedConn.getCatalog();locallyScopedConn.setCatalog(this.currentCatalog);}//// Check if we have cached metadata for this query...//Field[] cachedFields = null;//是否应用缓存ResultSetif (locallyScopedConn.getCacheResultSetMetadata()) {cachedMetaData = locallyScopedConn.getCachedMetaData(sql);if (cachedMetaData != null) {cachedFields = cachedMetaData.fields;}}if (locallyScopedConn.useMaxRows()) {// We need to execute this all together// So synchronize on the Connection's mutex (because// even queries going through there synchronize// on the connectionif (StringUtils.indexOfIgnoreCase(sql, "LIMIT") != -1) { //$NON-NLS-1$this.results = locallyScopedConn.execSQL(this, sql,this.maxRows, null, this.resultSetType,this.resultSetConcurrency,doStreaming,this.currentCatalog, cachedFields);} else {if (this.maxRows <= 0) {executeSimpleNonQuery(locallyScopedConn,"SET OPTION SQL_SELECT_LIMIT=DEFAULT");} else {executeSimpleNonQuery(locallyScopedConn,"SET OPTION SQL_SELECT_LIMIT=" + this.maxRows);}statementBegins();this.results = locallyScopedConn.execSQL(this, sql, -1,null, this.resultSetType,this.resultSetConcurrency,doStreaming,this.currentCatalog, cachedFields);if (oldCatalog != null) {locallyScopedConn.setCatalog(oldCatalog);}}} else {statementBegins();//ConnectionImpl中执行sql语句this.results = locallyScopedConn.execSQL(this, sql, -1, null,this.resultSetType, this.resultSetConcurrency,doStreaming,this.currentCatalog, cachedFields);}if (timeoutTask != null) {if (timeoutTask.caughtWhileCancelling != null) {throw timeoutTask.caughtWhileCancelling;}timeoutTask.cancel();locallyScopedConn.getCancelTimer().purge();timeoutTask = null;}synchronized (this.cancelTimeoutMutex) {if (this.wasCancelled) {SQLException cause = null;if (this.wasCancelledByTimeout) {cause = new MySQLTimeoutException();} else {cause = new MySQLStatementCancelledException();}resetCancelledState();throw cause;}}} finally {this.statementExecuting.set(false);if (timeoutTask != null) {timeoutTask.cancel();locallyScopedConn.getCancelTimer().purge();}if (oldCatalog != null) {locallyScopedConn.setCatalog(oldCatalog);}}this.lastInsertId = this.results.getUpdateID();if (cachedMetaData != null) {locallyScopedConn.initializeResultsMetadataFromCache(sql, cachedMetaData,this.results);} else {if (this.connection.getCacheResultSetMetadata()) {locallyScopedConn.initializeResultsMetadataFromCache(sql,null /* will be created */, this.results);}}return this.results;}}
? 进入ConnectionImpl.execSQl(..)
?
public synchronized ResultSetInternalMethods execSQL(StatementImpl callingStatement, String sql, int maxRows,Buffer packet, int resultSetType, int resultSetConcurrency,boolean streamResults, String catalog,Field[] cachedMetadata,boolean isBatch) throws SQLException {long queryStartTime = 0;int endOfQueryPacketPosition = 0;if (packet != null) {endOfQueryPacketPosition = packet.getPosition();}if (getGatherPerformanceMetrics()) {queryStartTime = System.currentTimeMillis();}this.lastQueryFinishedTime = 0; // we're busy!if ((getHighAvailability())&& (this.autoCommit || getAutoReconnectForPools())&& this.needsPing && !isBatch) {try {pingInternal(false, 0);this.needsPing = false;} catch (Exception Ex) {createNewIO(true);}}try {if (packet == null) {String encoding = null;if (getUseUnicode()) {encoding = getEncoding();}//进入MysqlIO中执行查询操作return this.io.sqlQueryDirect(callingStatement, sql,encoding, null, maxRows, resultSetType,resultSetConcurrency, streamResults, catalog,cachedMetadata);}return this.io.sqlQueryDirect(callingStatement, null, null,packet, maxRows, resultSetType,resultSetConcurrency, streamResults, catalog,cachedMetadata);} catch (java.sql.SQLException sqlE) {// don't clobber SQL exceptionsif (getDumpQueriesOnException()) {String extractedSql = extractSqlFromPacket(sql, packet,endOfQueryPacketPosition);StringBuffer messageBuf = new StringBuffer(extractedSql.length() + 32);messageBuf.append("\n\nQuery being executed when exception was thrown:\n");messageBuf.append(extractedSql);messageBuf.append("\n\n");sqlE = appendMessageToException(sqlE, messageBuf.toString(), getExceptionInterceptor());}if ((getHighAvailability())) {this.needsPing = true;} else {String sqlState = sqlE.getSQLState();if ((sqlState != null)&& sqlState.equals(SQLError.SQL_STATE_COMMUNICATION_LINK_FAILURE)) {cleanup(sqlE);}}throw sqlE;} catch (Exception ex) {if (getHighAvailability()) {this.needsPing = true;} else if (ex instanceof IOException) {cleanup(ex);}SQLException sqlEx = SQLError.createSQLException(Messages.getString("Connection.UnexpectedException"),SQLError.SQL_STATE_GENERAL_ERROR, getExceptionInterceptor());sqlEx.initCause(ex);throw sqlEx;} finally {if (getMaintainTimeStats()) {this.lastQueryFinishedTime = System.currentTimeMillis();}if (getGatherPerformanceMetrics()) {long queryTime = System.currentTimeMillis()- queryStartTime;registerQueryExecutionTime(queryTime);}}}?
?
进入MysqlIO.sqlQueryDirect(..)中执行生成ResultSet
?
final ResultSetInternalMethods sqlQueryDirect(StatementImpl callingStatement, String query, String characterEncoding, Buffer queryPacket, int maxRows, int resultSetType, int resultSetConcurrency, boolean streamResults, String catalog, Field[] cachedMetadata) throws Exception { this.statementExecutionDepth++; try { if (this.statementInterceptors != null) { ResultSetInternalMethods interceptedResults = invokeStatementInterceptorsPre(query, callingStatement, false); if (interceptedResults != null) { return interceptedResults; } } long queryStartTime = 0; long queryEndTime = 0; String statementComment = this.connection.getStatementComment(); if (this.connection.getIncludeThreadNamesAsStatementComment()) { statementComment = (statementComment != null ? statementComment + ", " : "") + "java thread: " + Thread.currentThread().getName(); } if (query != null) { // We don't know exactly how many bytes we're going to get // from the query. Since we're dealing with Unicode, the // max is 2, so pad it (2 * query) + space for headers int packLength = HEADER_LENGTH + 1 + (query.length() * 2) + 2; byte[] commentAsBytes = null; if (statementComment != null) { commentAsBytes = StringUtils.getBytes(statementComment, null, characterEncoding, this.connection .getServerCharacterEncoding(), this.connection.parserKnowsUnicode(), getExceptionInterceptor()); packLength += commentAsBytes.length; packLength += 6; // for /*[space] [space]*/ }//sendPacket封装数据的包,发送至服务器 if (this.sendPacket == null) { this.sendPacket = new Buffer(packLength); } else { this.sendPacket.clear(); } this.sendPacket.writeByte((byte) MysqlDefs.QUERY); if (commentAsBytes != null) { this.sendPacket.writeBytesNoNull(Constants.SLASH_STAR_SPACE_AS_BYTES); this.sendPacket.writeBytesNoNull(commentAsBytes); this.sendPacket.writeBytesNoNull(Constants.SPACE_STAR_SLASH_SPACE_AS_BYTES); } if (characterEncoding != null) { if (this.platformDbCharsetMatches) { this.sendPacket.writeStringNoNull(query, characterEncoding, this.connection.getServerCharacterEncoding(), this.connection.parserKnowsUnicode(), this.connection); } else { if (StringUtils.startsWithIgnoreCaseAndWs(query, "LOAD DATA")) { //$NON-NLS-1$ this.sendPacket.writeBytesNoNull(StringUtils.getBytes(query)); } else { this.sendPacket.writeStringNoNull(query, characterEncoding, this.connection.getServerCharacterEncoding(), this.connection.parserKnowsUnicode(), this.connection); } } } else { this.sendPacket.writeStringNoNull(query); } //赋值 queryPacket = this.sendPacket; } byte[] queryBuf = null; int oldPacketPosition = 0; if (needToGrabQueryFromPacket) { queryBuf = queryPacket.getByteBuffer(); // save the packet position oldPacketPosition = queryPacket.getPosition(); queryStartTime = getCurrentTimeNanosOrMillis(); } if (this.autoGenerateTestcaseScript) { String testcaseQuery = null; if (query != null) { if (statementComment != null) { testcaseQuery = "/* " + statementComment + " */ " + query; } else { testcaseQuery = query; } } else { testcaseQuery = StringUtils.toString(queryBuf, 5, (oldPacketPosition - 5)); } StringBuffer debugBuf = new StringBuffer(testcaseQuery.length() + 32); this.connection.generateConnectionCommentBlock(debugBuf); debugBuf.append(testcaseQuery); debugBuf.append(';'); this.connection.dumpTestcaseQuery(debugBuf.toString()); } // Send query command and sql query string //发送查询命与sql查询语句,并得到查询结果(socket处理) Buffer resultPacket = sendCommand(MysqlDefs.QUERY, null, queryPacket, false, null, 0); long fetchBeginTime = 0; long fetchEndTime = 0; String profileQueryToLog = null; boolean queryWasSlow = false; if (this.profileSql || this.logSlowQueries) { queryEndTime = System.currentTimeMillis(); boolean shouldExtractQuery = false; if (this.profileSql) { shouldExtractQuery = true; } else if (this.logSlowQueries) { long queryTime = queryEndTime - queryStartTime; boolean logSlow = false; if (this.useAutoSlowLog) { logSlow = queryTime > this.connection.getSlowQueryThresholdMillis(); } else { logSlow = this.connection.isAbonormallyLongQuery(queryTime); this.connection.reportQueryTime(queryTime); } if (logSlow) { shouldExtractQuery = true; queryWasSlow = true; } } if (shouldExtractQuery) { // Extract the actual query from the network packet boolean truncated = false; int extractPosition = oldPacketPosition; if (oldPacketPosition > this.connection.getMaxQuerySizeToLog()) { extractPosition = this.connection.getMaxQuerySizeToLog() + 5; truncated = true; } profileQueryToLog = StringUtils.toString(queryBuf, 5, (extractPosition - 5)); if (truncated) { profileQueryToLog += Messages.getString("MysqlIO.25"); //$NON-NLS-1$ } } fetchBeginTime = queryEndTime; } //封装成ResultSet ResultSetInternalMethods rs = readAllResults(callingStatement, maxRows, resultSetType, resultSetConcurrency, streamResults, catalog, resultPacket, false, -1L, cachedMetadata); if (queryWasSlow && !this.serverQueryWasSlow /* don't log slow queries twice */) { StringBuffer mesgBuf = new StringBuffer(48 + profileQueryToLog.length()); mesgBuf.append(Messages.getString("MysqlIO.SlowQuery", new Object[] {Long.valueOf(this.slowQueryThreshold), queryTimingUnits, Long.valueOf(queryEndTime - queryStartTime)})); mesgBuf.append(profileQueryToLog); ProfilerEventHandler eventSink = ProfilerEventHandlerFactory.getInstance(this.connection); eventSink.consumeEvent(new ProfilerEvent(ProfilerEvent.TYPE_SLOW_QUERY, "", catalog, this.connection.getId(), //$NON-NLS-1$ (callingStatement != null) ? callingStatement.getId() : 999, ((ResultSetImpl)rs).resultId, System.currentTimeMillis(), (int) (queryEndTime - queryStartTime), queryTimingUnits, null, new Throwable(), mesgBuf.toString())); if (this.connection.getExplainSlowQueries()) { if (oldPacketPosition < MAX_QUERY_SIZE_TO_EXPLAIN) { explainSlowQuery(queryPacket.getBytes(5, (oldPacketPosition - 5)), profileQueryToLog); } else { this.connection.getLog().logWarn(Messages.getString( "MysqlIO.28") //$NON-NLS-1$ +MAX_QUERY_SIZE_TO_EXPLAIN + Messages.getString("MysqlIO.29")); //$NON-NLS-1$ } } } if (this.logSlowQueries) { ProfilerEventHandler eventSink = ProfilerEventHandlerFactory.getInstance(this.connection); if (this.queryBadIndexUsed && this.profileSql) { eventSink.consumeEvent(new ProfilerEvent( ProfilerEvent.TYPE_SLOW_QUERY, "", catalog, //$NON-NLS-1$ this.connection.getId(), (callingStatement != null) ? callingStatement.getId() : 999, ((ResultSetImpl)rs).resultId, System.currentTimeMillis(), (queryEndTime - queryStartTime), this.queryTimingUnits, null, new Throwable(), Messages.getString("MysqlIO.33") //$NON-NLS-1$ +profileQueryToLog)); } if (this.queryNoIndexUsed && this.profileSql) { eventSink.consumeEvent(new ProfilerEvent( ProfilerEvent.TYPE_SLOW_QUERY, "", catalog, //$NON-NLS-1$ this.connection.getId(), (callingStatement != null) ? callingStatement.getId() : 999, ((ResultSetImpl)rs).resultId, System.currentTimeMillis(), (queryEndTime - queryStartTime), this.queryTimingUnits, null, new Throwable(), Messages.getString("MysqlIO.35") //$NON-NLS-1$ +profileQueryToLog)); } if (this.serverQueryWasSlow && this.profileSql) { eventSink.consumeEvent(new ProfilerEvent( ProfilerEvent.TYPE_SLOW_QUERY, "", catalog, //$NON-NLS-1$ this.connection.getId(), (callingStatement != null) ? callingStatement.getId() : 999, ((ResultSetImpl)rs).resultId, System.currentTimeMillis(), (queryEndTime - queryStartTime), this.queryTimingUnits, null, new Throwable(), Messages.getString("MysqlIO.ServerSlowQuery") //$NON-NLS-1$ +profileQueryToLog)); } } if (this.profileSql) { fetchEndTime = getCurrentTimeNanosOrMillis(); ProfilerEventHandler eventSink = ProfilerEventHandlerFactory.getInstance(this.connection); eventSink.consumeEvent(new ProfilerEvent(ProfilerEvent.TYPE_QUERY, "", catalog, this.connection.getId(), //$NON-NLS-1$ (callingStatement != null) ? callingStatement.getId() : 999, ((ResultSetImpl)rs).resultId, System.currentTimeMillis(), (queryEndTime - queryStartTime), this.queryTimingUnits, null, new Throwable(), profileQueryToLog)); eventSink.consumeEvent(new ProfilerEvent(ProfilerEvent.TYPE_FETCH, "", catalog, this.connection.getId(), //$NON-NLS-1$ (callingStatement != null) ? callingStatement.getId() : 999, ((ResultSetImpl)rs).resultId, System.currentTimeMillis(), (fetchEndTime - fetchBeginTime), this.queryTimingUnits, null, new Throwable(), null)); } if (this.hadWarnings) { scanForAndThrowDataTruncation(); } if (this.statementInterceptors != null) { ResultSetInternalMethods interceptedResults = invokeStatementInterceptorsPost( query, callingStatement, rs, false, null); if (interceptedResults != null) { rs = interceptedResults; } } return rs; } catch (SQLException sqlEx) { if (this.statementInterceptors != null) { invokeStatementInterceptorsPost( query, callingStatement, null, false, sqlEx); // we don't do anything with the result set in this case } if (callingStatement != null) { synchronized (callingStatement.cancelTimeoutMutex) { if (callingStatement.wasCancelled) {SQLException cause = null;if (callingStatement.wasCancelledByTimeout) {cause = new MySQLTimeoutException();} else {cause = new MySQLStatementCancelledException();}callingStatement.resetCancelledState();throw cause;} } } throw sqlEx; } finally { this.statementExecutionDepth--; } }
?MysqlIO.readAllResults(..)处理生成ResultSet
ResultSetImpl readAllResults(StatementImpl callingStatement, int maxRows, int resultSetType, int resultSetConcurrency, boolean streamResults, String catalog, Buffer resultPacket, boolean isBinaryEncoded, long preSentColumnCount, Field[] metadataFromCache) throws SQLException { //设置指针 resultPacket.setPosition(resultPacket.getPosition() - 1); //读取第一条数据 ResultSetImpl topLevelResultSet = readResultsForQueryOrUpdate(callingStatement, maxRows, resultSetType, resultSetConcurrency, streamResults, catalog, resultPacket, isBinaryEncoded, preSentColumnCount, metadataFromCache); ResultSetImpl currentResultSet = topLevelResultSet; boolean checkForMoreResults = ((this.clientParam & CLIENT_MULTI_RESULTS) != 0); boolean serverHasMoreResults = (this.serverStatus & SERVER_MORE_RESULTS_EXISTS) != 0; // // TODO: We need to support streaming of multiple result sets // if (serverHasMoreResults && streamResults) { //clearInputStream();// //throw SQLError.createSQLException(Messages.getString("MysqlIO.23"), //$NON-NLS-1$ //SQLError.SQL_STATE_DRIVER_NOT_CAPABLE); if (topLevelResultSet.getUpdateCount() != -1) { tackOnMoreStreamingResults(topLevelResultSet); } reclaimLargeReusablePacket(); return topLevelResultSet; } boolean moreRowSetsExist = checkForMoreResults & serverHasMoreResults; while (moreRowSetsExist) { Buffer fieldPacket = checkErrorPacket(); fieldPacket.setPosition(0); ResultSetImpl newResultSet = readResultsForQueryOrUpdate(callingStatement, maxRows, resultSetType, resultSetConcurrency, streamResults, catalog, fieldPacket, isBinaryEncoded, preSentColumnCount, metadataFromCache); currentResultSet.setNextResultSet(newResultSet); currentResultSet = newResultSet; moreRowSetsExist = (this.serverStatus & SERVER_MORE_RESULTS_EXISTS) != 0; } if (!streamResults) { clearInputStream(); } reclaimLargeReusablePacket(); return topLevelResultSet; }============================= protected final ResultSetImpl readResultsForQueryOrUpdate( StatementImpl callingStatement, int maxRows, int resultSetType, int resultSetConcurrency, boolean streamResults, String catalog, Buffer resultPacket, boolean isBinaryEncoded, long preSentColumnCount, Field[] metadataFromCache) throws SQLException { long columnCount = resultPacket.readFieldLength(); if (columnCount == 0) { return buildResultSetWithUpdates(callingStatement, resultPacket); } else if (columnCount == Buffer.NULL_LENGTH) { String charEncoding = null; if (this.connection.getUseUnicode()) { charEncoding = this.connection.getEncoding(); } String fileName = null; if (this.platformDbCharsetMatches) { fileName = ((charEncoding != null) ? resultPacket.readString(charEncoding, getExceptionInterceptor()) : resultPacket.readString()); } else { fileName = resultPacket.readString(); } return sendFileToServer(callingStatement, fileName); } else { //获取结果集 com.mysql.jdbc.ResultSetImpl results = getResultSet(callingStatement, columnCount, maxRows, resultSetType, resultSetConcurrency, streamResults, catalog, isBinaryEncoded, metadataFromCache); return results; } }========================= //对数据进行解析封装生成结果集 protected ResultSetImpl getResultSet(StatementImpl callingStatement, long columnCount, int maxRows, int resultSetType, int resultSetConcurrency, boolean streamResults, String catalog, boolean isBinaryEncoded, Field[] metadataFromCache) throws SQLException { Buffer packet; // The packet from the server //字段数组 Field[] fields = null; // Read in the column information if (metadataFromCache == null /* we want the metadata from the server */) { fields = new Field[(int) columnCount]; for (int i = 0; i < columnCount; i++) { Buffer fieldPacket = null; //循环处理 fieldPacket = readPacket(); fields[i] = unpackField(fieldPacket, false); } } else { for (int i = 0; i < columnCount; i++) { skipPacket(); } } // packet = reuseAndReadPacket(this.reusablePacket); readServerStatusForResultSets(packet);//// Handle cursor-based fetch first//if (this.connection.versionMeetsMinimum(5, 0, 2)&& this.connection.getUseCursorFetch()&& isBinaryEncoded&& callingStatement != null&& callingStatement.getFetchSize() != 0&& callingStatement.getResultSetType() == ResultSet.TYPE_FORWARD_ONLY) {ServerPreparedStatement prepStmt = (com.mysql.jdbc.ServerPreparedStatement) callingStatement;boolean usingCursor = true;//// Server versions 5.0.5 or newer will only open// a cursor and set this flag if they can, otherwise// they punt and go back to mysql_store_results() behavior//if (this.connection.versionMeetsMinimum(5, 0, 5)) {usingCursor = (this.serverStatus &SERVER_STATUS_CURSOR_EXISTS) != 0;}if (usingCursor) {RowData rows = new RowDataCursor(this,prepStmt,fields);ResultSetImpl rs = buildResultSetWithRows(callingStatement,catalog,fields,rows, resultSetType, resultSetConcurrency, isBinaryEncoded);if (usingCursor) { rs.setFetchSize(callingStatement.getFetchSize()); }return rs;}} RowData rowData = null; if (!streamResults) { //封装成RowData的数据 rowData = readSingleRowSet(columnCount, maxRows, resultSetConcurrency, isBinaryEncoded, (metadataFromCache == null) ? fields : metadataFromCache); } else { rowData = new RowDataDynamic(this, (int) columnCount, (metadataFromCache == null) ? fields : metadataFromCache, isBinaryEncoded); this.streamingData = rowData; } //创建ResultSetImpl对象 ResultSetImpl rs = buildResultSetWithRows(callingStatement, catalog, (metadataFromCache == null) ? fields : metadataFromCache, rowData, resultSetType, resultSetConcurrency, isBinaryEncoded); return rs; }================================== //对rowData的封装 private RowData readSingleRowSet(long columnCount, int maxRows, int resultSetConcurrency, boolean isBinaryEncoded, Field[] fields) throws SQLException { RowData rowData; ArrayList rows = new ArrayList(); boolean useBufferRowExplicit = useBufferRowExplicit(fields); // Now read the data //读取数据 ResultSetRow row = nextRow(fields, (int) columnCount, isBinaryEncoded, resultSetConcurrency, false, useBufferRowExplicit, false, null); int rowCount = 0; if (row != null) { rows.add(row); rowCount = 1; } while (row != null) { //读取全部数据,封装至list row = nextRow(fields, (int) columnCount, isBinaryEncoded, resultSetConcurrency, false, useBufferRowExplicit, false, null); if (row != null) { if ((maxRows == -1) || (rowCount < maxRows)) { rows.add(row); rowCount++; } } } rowData = new RowDataStatic(rows); return rowData; }============================= //创建ResultSetImpl对象 private com.mysql.jdbc.ResultSetImpl buildResultSetWithRows( StatementImpl callingStatement, String catalog, com.mysql.jdbc.Field[] fields, RowData rows, int resultSetType, int resultSetConcurrency, boolean isBinaryEncoded) throws SQLException { ResultSetImpl rs = null; //根据所传的resultSet常量参数生成相应模式的ResultSetImpl switch (resultSetConcurrency) { case java.sql.ResultSet.CONCUR_READ_ONLY: rs = com.mysql.jdbc.ResultSetImpl.getInstance(catalog, fields, rows, this.connection, callingStatement, false); if (isBinaryEncoded) { rs.setBinaryEncoded(); } break; case java.sql.ResultSet.CONCUR_UPDATABLE: rs = com.mysql.jdbc.ResultSetImpl.getInstance(catalog, fields, rows, this.connection, callingStatement, true); break; default: return com.mysql.jdbc.ResultSetImpl.getInstance(catalog, fields, rows, this.connection, callingStatement, false); } rs.setResultSetType(resultSetType); rs.setResultSetConcurrency(resultSetConcurrency); return rs; }?
?
?
?
?
?
?
?
?
?