首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 数据库 > 其他数据库 >

Mysql JDBC驱动源码分析(Statement,ResultSet的创办)四

2012-07-16 
Mysql JDBC驱动源码分析(Statement,ResultSet的创建)四一,当连接创建完成时,接着就创建Statement进行sql查

Mysql JDBC驱动源码分析(Statement,ResultSet的创建)四

一,当连接创建完成时,接着就创建Statement进行sql查询,并返回相应的ResultSet

进入ConnectionImpl类下的,createStatement(..)

?

?

public java.sql.Statement createStatement(int resultSetType,int resultSetConcurrency, int resultSetHoldability)throws SQLException {               //对返回的结果集进行指定相应的模式功能,可参照ResultSet的常量设置if (getPedantic()) {if (resultSetHoldability != java.sql.ResultSet.HOLD_CURSORS_OVER_COMMIT) {throw SQLError.createSQLException("HOLD_CUSRORS_OVER_COMMIT is only supported holdability level",SQLError.SQL_STATE_ILLEGAL_ARGUMENT, getExceptionInterceptor());}}return createStatement(resultSetType, resultSetConcurrency);}public java.sql.Statement createStatement(int resultSetType,int resultSetConcurrency) throws SQLException {checkClosed();                //getLoadBalanceSafeProxy() 为相应的连接StatementImpl stmt = new StatementImpl(getLoadBalanceSafeProxy(), this.database);stmt.setResultSetType(resultSetType);stmt.setResultSetConcurrency(resultSetConcurrency);return stmt;}
?

进入StatementImpl .executeQuery(..)

?

public synchronized java.sql.ResultSet executeQuery(String sql)throws SQLException {checkClosed();//参数所传的连接对象MySQLConnection locallyScopedConn = this.connection;synchronized (locallyScopedConn) {this.retrieveGeneratedKeys = false;resetCancelledState();checkNullOrEmptyQuery(sql);//We only stream result sets when they are forward-only, read-only, and thefetch size has been set to Integer.MIN_VALUEboolean doStreaming = createStreamingResultSet();if (doStreaming&& this.connection.getNetTimeoutForStreamingResults() > 0) {executeSimpleNonQuery(locallyScopedConn, "SET net_write_timeout="+ this.connection.getNetTimeoutForStreamingResults());}if (this.doEscapeProcessing) {Object escapedSqlResult = EscapeProcessor.escapeSQL(sql,locallyScopedConn.serverSupportsConvertFn(), this.connection);if (escapedSqlResult instanceof String) {sql = (String) escapedSqlResult;} else {sql = ((EscapeProcessorResult) escapedSqlResult).escapedSql;}}char firstStatementChar = StringUtils.firstNonWsCharUc(sql,findStartOfStatement(sql));if (sql.charAt(0) == '/') {if (sql.startsWith(PING_MARKER)) {doPingInstead();return this.results;}}checkForDml(sql, firstStatementChar);if (this.results != null) {if (!locallyScopedConn.getHoldResultsOpenOverStatementClose()) {this.results.realClose(false);}}CachedResultSetMetaData cachedMetaData = null;// If there isn't a limit clause in the SQL// then limit the number of rows to return in// an efficient manner. Only do this if// setMaxRows() hasn't been used on any Statements// generated from the current Connection (saves// a query, and network traffic).if (useServerFetch()) {this.results = createResultSetUsingServerFetch(sql);return this.results;}CancelTask timeoutTask = null;String oldCatalog = null;try {if (locallyScopedConn.getEnableQueryTimeouts() &&this.timeoutInMillis != 0&& locallyScopedConn.versionMeetsMinimum(5, 0, 0)) {timeoutTask = new CancelTask(this);locallyScopedConn.getCancelTimer().schedule(timeoutTask,this.timeoutInMillis);}if (!locallyScopedConn.getCatalog().equals(this.currentCatalog)) {oldCatalog = locallyScopedConn.getCatalog();locallyScopedConn.setCatalog(this.currentCatalog);}//// Check if we have cached metadata for this query...//Field[] cachedFields = null;//是否应用缓存ResultSetif (locallyScopedConn.getCacheResultSetMetadata()) {cachedMetaData = locallyScopedConn.getCachedMetaData(sql);if (cachedMetaData != null) {cachedFields = cachedMetaData.fields;}}if (locallyScopedConn.useMaxRows()) {// We need to execute this all together// So synchronize on the Connection's mutex (because// even queries going through there synchronize// on the connectionif (StringUtils.indexOfIgnoreCase(sql, "LIMIT") != -1) { //$NON-NLS-1$this.results = locallyScopedConn.execSQL(this, sql,this.maxRows, null, this.resultSetType,this.resultSetConcurrency,doStreaming,this.currentCatalog, cachedFields);} else {if (this.maxRows <= 0) {executeSimpleNonQuery(locallyScopedConn,"SET OPTION SQL_SELECT_LIMIT=DEFAULT");} else {executeSimpleNonQuery(locallyScopedConn,"SET OPTION SQL_SELECT_LIMIT=" + this.maxRows);}statementBegins();this.results = locallyScopedConn.execSQL(this, sql, -1,null, this.resultSetType,this.resultSetConcurrency,doStreaming,this.currentCatalog, cachedFields);if (oldCatalog != null) {locallyScopedConn.setCatalog(oldCatalog);}}} else {statementBegins();//ConnectionImpl中执行sql语句this.results = locallyScopedConn.execSQL(this, sql, -1, null,this.resultSetType, this.resultSetConcurrency,doStreaming,this.currentCatalog, cachedFields);}if (timeoutTask != null) {if (timeoutTask.caughtWhileCancelling != null) {throw timeoutTask.caughtWhileCancelling;}timeoutTask.cancel();locallyScopedConn.getCancelTimer().purge();timeoutTask = null;}synchronized (this.cancelTimeoutMutex) {if (this.wasCancelled) {SQLException cause = null;if (this.wasCancelledByTimeout) {cause = new MySQLTimeoutException();} else {cause = new MySQLStatementCancelledException();}resetCancelledState();throw cause;}}} finally {this.statementExecuting.set(false);if (timeoutTask != null) {timeoutTask.cancel();locallyScopedConn.getCancelTimer().purge();}if (oldCatalog != null) {locallyScopedConn.setCatalog(oldCatalog);}}this.lastInsertId = this.results.getUpdateID();if (cachedMetaData != null) {locallyScopedConn.initializeResultsMetadataFromCache(sql, cachedMetaData,this.results);} else {if (this.connection.getCacheResultSetMetadata()) {locallyScopedConn.initializeResultsMetadataFromCache(sql,null /* will be created */, this.results);}}return this.results;}}

? 进入ConnectionImpl.execSQl(..)

?

public synchronized ResultSetInternalMethods execSQL(StatementImpl callingStatement, String sql, int maxRows,Buffer packet, int resultSetType, int resultSetConcurrency,boolean streamResults, String catalog,Field[] cachedMetadata,boolean isBatch) throws SQLException {long queryStartTime = 0;int endOfQueryPacketPosition = 0;if (packet != null) {endOfQueryPacketPosition = packet.getPosition();}if (getGatherPerformanceMetrics()) {queryStartTime = System.currentTimeMillis();}this.lastQueryFinishedTime = 0; // we're busy!if ((getHighAvailability())&& (this.autoCommit || getAutoReconnectForPools())&& this.needsPing && !isBatch) {try {pingInternal(false, 0);this.needsPing = false;} catch (Exception Ex) {createNewIO(true);}}try {if (packet == null) {String encoding = null;if (getUseUnicode()) {encoding = getEncoding();}//进入MysqlIO中执行查询操作return this.io.sqlQueryDirect(callingStatement, sql,encoding, null, maxRows, resultSetType,resultSetConcurrency, streamResults, catalog,cachedMetadata);}return this.io.sqlQueryDirect(callingStatement, null, null,packet, maxRows, resultSetType,resultSetConcurrency, streamResults, catalog,cachedMetadata);} catch (java.sql.SQLException sqlE) {// don't clobber SQL exceptionsif (getDumpQueriesOnException()) {String extractedSql = extractSqlFromPacket(sql, packet,endOfQueryPacketPosition);StringBuffer messageBuf = new StringBuffer(extractedSql.length() + 32);messageBuf.append("\n\nQuery being executed when exception was thrown:\n");messageBuf.append(extractedSql);messageBuf.append("\n\n");sqlE = appendMessageToException(sqlE, messageBuf.toString(), getExceptionInterceptor());}if ((getHighAvailability())) {this.needsPing = true;} else {String sqlState = sqlE.getSQLState();if ((sqlState != null)&& sqlState.equals(SQLError.SQL_STATE_COMMUNICATION_LINK_FAILURE)) {cleanup(sqlE);}}throw sqlE;} catch (Exception ex) {if (getHighAvailability()) {this.needsPing = true;} else if (ex instanceof IOException) {cleanup(ex);}SQLException sqlEx = SQLError.createSQLException(Messages.getString("Connection.UnexpectedException"),SQLError.SQL_STATE_GENERAL_ERROR, getExceptionInterceptor());sqlEx.initCause(ex);throw sqlEx;} finally {if (getMaintainTimeStats()) {this.lastQueryFinishedTime = System.currentTimeMillis();}if (getGatherPerformanceMetrics()) {long queryTime = System.currentTimeMillis()- queryStartTime;registerQueryExecutionTime(queryTime);}}}
?

?

进入MysqlIO.sqlQueryDirect(..)中执行生成ResultSet

?

  final ResultSetInternalMethods sqlQueryDirect(StatementImpl callingStatement, String query,    String characterEncoding, Buffer queryPacket, int maxRows,    int resultSetType, int resultSetConcurrency,    boolean streamResults, String catalog, Field[] cachedMetadata)    throws Exception {    this.statementExecutionDepth++;    try {    if (this.statementInterceptors != null) {    ResultSetInternalMethods interceptedResults =    invokeStatementInterceptorsPre(query, callingStatement, false);    if (interceptedResults != null) {    return interceptedResults;    }    }    long queryStartTime = 0;    long queryEndTime = 0;    String statementComment = this.connection.getStatementComment();        if (this.connection.getIncludeThreadNamesAsStatementComment()) {    statementComment = (statementComment != null ? statementComment + ", " : "") + "java thread: " + Thread.currentThread().getName();    }        if (query != null) {    // We don't know exactly how many bytes we're going to get    // from the query. Since we're dealing with Unicode, the    // max is 2, so pad it (2 * query) + space for headers    int packLength = HEADER_LENGTH + 1 + (query.length() * 2) + 2;    byte[] commentAsBytes = null;    if (statementComment != null) {    commentAsBytes = StringUtils.getBytes(statementComment, null,    characterEncoding, this.connection    .getServerCharacterEncoding(),    this.connection.parserKnowsUnicode(), getExceptionInterceptor());    packLength += commentAsBytes.length;    packLength += 6; // for /*[space] [space]*/    }//sendPacket封装数据的包,发送至服务器    if (this.sendPacket == null) {    this.sendPacket = new Buffer(packLength);    } else {    this.sendPacket.clear();    }    this.sendPacket.writeByte((byte) MysqlDefs.QUERY);    if (commentAsBytes != null) {    this.sendPacket.writeBytesNoNull(Constants.SLASH_STAR_SPACE_AS_BYTES);    this.sendPacket.writeBytesNoNull(commentAsBytes);    this.sendPacket.writeBytesNoNull(Constants.SPACE_STAR_SLASH_SPACE_AS_BYTES);    }    if (characterEncoding != null) {    if (this.platformDbCharsetMatches) {    this.sendPacket.writeStringNoNull(query, characterEncoding,    this.connection.getServerCharacterEncoding(),    this.connection.parserKnowsUnicode(),    this.connection);    } else {    if (StringUtils.startsWithIgnoreCaseAndWs(query, "LOAD DATA")) { //$NON-NLS-1$    this.sendPacket.writeBytesNoNull(StringUtils.getBytes(query));    } else {    this.sendPacket.writeStringNoNull(query,    characterEncoding,    this.connection.getServerCharacterEncoding(),    this.connection.parserKnowsUnicode(),    this.connection);    }    }    } else {    this.sendPacket.writeStringNoNull(query);    }                         //赋值    queryPacket = this.sendPacket;    }    byte[] queryBuf = null;    int oldPacketPosition = 0;    if (needToGrabQueryFromPacket) {    queryBuf = queryPacket.getByteBuffer();    // save the packet position    oldPacketPosition = queryPacket.getPosition();    queryStartTime = getCurrentTimeNanosOrMillis();    }        if (this.autoGenerateTestcaseScript) {    String testcaseQuery = null;    if (query != null) {    if (statementComment != null) {    testcaseQuery = "/* " + statementComment + " */ " + query;    } else {    testcaseQuery = query;    }    } else {    testcaseQuery = StringUtils.toString(queryBuf, 5,    (oldPacketPosition - 5));    }    StringBuffer debugBuf = new StringBuffer(testcaseQuery.length() + 32);    this.connection.generateConnectionCommentBlock(debugBuf);    debugBuf.append(testcaseQuery);    debugBuf.append(';');    this.connection.dumpTestcaseQuery(debugBuf.toString());    }    // Send query command and sql query string                //发送查询命与sql查询语句,并得到查询结果(socket处理)    Buffer resultPacket = sendCommand(MysqlDefs.QUERY, null, queryPacket,    false, null, 0);    long fetchBeginTime = 0;    long fetchEndTime = 0;    String profileQueryToLog = null;    boolean queryWasSlow = false;    if (this.profileSql || this.logSlowQueries) {    queryEndTime = System.currentTimeMillis();    boolean shouldExtractQuery = false;    if (this.profileSql) {    shouldExtractQuery = true;    } else if (this.logSlowQueries) {    long queryTime = queryEndTime - queryStartTime;        boolean logSlow = false;        if (this.useAutoSlowLog) {    logSlow = queryTime > this.connection.getSlowQueryThresholdMillis();    } else {    logSlow = this.connection.isAbonormallyLongQuery(queryTime);        this.connection.reportQueryTime(queryTime);    }        if (logSlow) {    shouldExtractQuery = true;    queryWasSlow = true;    }    }    if (shouldExtractQuery) {    // Extract the actual query from the network packet    boolean truncated = false;    int extractPosition = oldPacketPosition;    if (oldPacketPosition > this.connection.getMaxQuerySizeToLog()) {    extractPosition = this.connection.getMaxQuerySizeToLog() + 5;    truncated = true;    }    profileQueryToLog = StringUtils.toString(queryBuf, 5,    (extractPosition - 5));    if (truncated) {    profileQueryToLog += Messages.getString("MysqlIO.25"); //$NON-NLS-1$    }    }    fetchBeginTime = queryEndTime;    }                //封装成ResultSet    ResultSetInternalMethods rs = readAllResults(callingStatement, maxRows, resultSetType,    resultSetConcurrency, streamResults, catalog, resultPacket,    false, -1L, cachedMetadata);    if (queryWasSlow && !this.serverQueryWasSlow /* don't log slow queries twice */) {    StringBuffer mesgBuf = new StringBuffer(48 +    profileQueryToLog.length());    mesgBuf.append(Messages.getString("MysqlIO.SlowQuery",    new Object[] {Long.valueOf(this.slowQueryThreshold),    queryTimingUnits,    Long.valueOf(queryEndTime - queryStartTime)}));    mesgBuf.append(profileQueryToLog);    ProfilerEventHandler eventSink = ProfilerEventHandlerFactory.getInstance(this.connection);    eventSink.consumeEvent(new ProfilerEvent(ProfilerEvent.TYPE_SLOW_QUERY,    "", catalog, this.connection.getId(), //$NON-NLS-1$    (callingStatement != null) ? callingStatement.getId() : 999,    ((ResultSetImpl)rs).resultId, System.currentTimeMillis(),    (int) (queryEndTime - queryStartTime), queryTimingUnits, null,    new Throwable(), mesgBuf.toString()));    if (this.connection.getExplainSlowQueries()) {    if (oldPacketPosition < MAX_QUERY_SIZE_TO_EXPLAIN) {    explainSlowQuery(queryPacket.getBytes(5,    (oldPacketPosition - 5)), profileQueryToLog);    } else {    this.connection.getLog().logWarn(Messages.getString(    "MysqlIO.28") //$NON-NLS-1$    +MAX_QUERY_SIZE_TO_EXPLAIN +    Messages.getString("MysqlIO.29")); //$NON-NLS-1$    }    }    }    if (this.logSlowQueries) {    ProfilerEventHandler eventSink = ProfilerEventHandlerFactory.getInstance(this.connection);    if (this.queryBadIndexUsed && this.profileSql) {    eventSink.consumeEvent(new ProfilerEvent(    ProfilerEvent.TYPE_SLOW_QUERY, "", catalog, //$NON-NLS-1$    this.connection.getId(),    (callingStatement != null) ? callingStatement.getId()    : 999, ((ResultSetImpl)rs).resultId,    System.currentTimeMillis(),    (queryEndTime - queryStartTime), this.queryTimingUnits,    null,    new Throwable(),    Messages.getString("MysqlIO.33") //$NON-NLS-1$    +profileQueryToLog));    }    if (this.queryNoIndexUsed && this.profileSql) {    eventSink.consumeEvent(new ProfilerEvent(    ProfilerEvent.TYPE_SLOW_QUERY, "", catalog, //$NON-NLS-1$    this.connection.getId(),    (callingStatement != null) ? callingStatement.getId()    : 999, ((ResultSetImpl)rs).resultId,    System.currentTimeMillis(),    (queryEndTime - queryStartTime), this.queryTimingUnits,    null,    new Throwable(),    Messages.getString("MysqlIO.35") //$NON-NLS-1$    +profileQueryToLog));    }        if (this.serverQueryWasSlow && this.profileSql) {    eventSink.consumeEvent(new ProfilerEvent(    ProfilerEvent.TYPE_SLOW_QUERY, "", catalog, //$NON-NLS-1$    this.connection.getId(),    (callingStatement != null) ? callingStatement.getId()    : 999, ((ResultSetImpl)rs).resultId,    System.currentTimeMillis(),    (queryEndTime - queryStartTime), this.queryTimingUnits,    null,    new Throwable(),    Messages.getString("MysqlIO.ServerSlowQuery") //$NON-NLS-1$    +profileQueryToLog));    }    }    if (this.profileSql) {    fetchEndTime = getCurrentTimeNanosOrMillis();    ProfilerEventHandler eventSink = ProfilerEventHandlerFactory.getInstance(this.connection);    eventSink.consumeEvent(new ProfilerEvent(ProfilerEvent.TYPE_QUERY,    "", catalog, this.connection.getId(), //$NON-NLS-1$    (callingStatement != null) ? callingStatement.getId() : 999,    ((ResultSetImpl)rs).resultId, System.currentTimeMillis(),    (queryEndTime - queryStartTime), this.queryTimingUnits,    null,    new Throwable(), profileQueryToLog));    eventSink.consumeEvent(new ProfilerEvent(ProfilerEvent.TYPE_FETCH,    "", catalog, this.connection.getId(), //$NON-NLS-1$    (callingStatement != null) ? callingStatement.getId() : 999,    ((ResultSetImpl)rs).resultId, System.currentTimeMillis(),    (fetchEndTime - fetchBeginTime), this.queryTimingUnits,    null,    new Throwable(), null));    }    if (this.hadWarnings) {    scanForAndThrowDataTruncation();    }    if (this.statementInterceptors != null) {    ResultSetInternalMethods interceptedResults = invokeStatementInterceptorsPost(    query, callingStatement, rs, false, null);    if (interceptedResults != null) {    rs = interceptedResults;    }    }    return rs;    } catch (SQLException sqlEx) {    if (this.statementInterceptors != null) {    invokeStatementInterceptorsPost(    query, callingStatement, null, false, sqlEx); // we don't do anything with the result set in this case    }        if (callingStatement != null) {    synchronized (callingStatement.cancelTimeoutMutex) {    if (callingStatement.wasCancelled) {SQLException cause = null;if (callingStatement.wasCancelledByTimeout) {cause = new MySQLTimeoutException();} else {cause = new MySQLStatementCancelledException();}callingStatement.resetCancelledState();throw cause;}    }    }        throw sqlEx;    } finally {    this.statementExecutionDepth--;    }    }

?MysqlIO.readAllResults(..)处理生成ResultSet

 ResultSetImpl readAllResults(StatementImpl callingStatement, int maxRows,        int resultSetType, int resultSetConcurrency, boolean streamResults,        String catalog, Buffer resultPacket, boolean isBinaryEncoded,        long preSentColumnCount, Field[] metadataFromCache)        throws SQLException {        //设置指针        resultPacket.setPosition(resultPacket.getPosition() - 1);        //读取第一条数据        ResultSetImpl topLevelResultSet = readResultsForQueryOrUpdate(callingStatement,                maxRows, resultSetType, resultSetConcurrency, streamResults,                catalog, resultPacket, isBinaryEncoded, preSentColumnCount,                metadataFromCache);        ResultSetImpl currentResultSet = topLevelResultSet;        boolean checkForMoreResults = ((this.clientParam &            CLIENT_MULTI_RESULTS) != 0);        boolean serverHasMoreResults = (this.serverStatus &            SERVER_MORE_RESULTS_EXISTS) != 0;        //        // TODO: We need to support streaming of multiple result sets        //        if (serverHasMoreResults && streamResults) {            //clearInputStream();//            //throw SQLError.createSQLException(Messages.getString("MysqlIO.23"), //$NON-NLS-1$                //SQLError.SQL_STATE_DRIVER_NOT_CAPABLE);        if (topLevelResultSet.getUpdateCount() != -1) {        tackOnMoreStreamingResults(topLevelResultSet);        }                reclaimLargeReusablePacket();                return topLevelResultSet;        }        boolean moreRowSetsExist = checkForMoreResults & serverHasMoreResults;        while (moreRowSetsExist) {        Buffer fieldPacket = checkErrorPacket();            fieldPacket.setPosition(0);            ResultSetImpl newResultSet = readResultsForQueryOrUpdate(callingStatement,                    maxRows, resultSetType, resultSetConcurrency,                    streamResults, catalog, fieldPacket, isBinaryEncoded,                    preSentColumnCount, metadataFromCache);            currentResultSet.setNextResultSet(newResultSet);            currentResultSet = newResultSet;            moreRowSetsExist = (this.serverStatus & SERVER_MORE_RESULTS_EXISTS) != 0;        }        if (!streamResults) {            clearInputStream();        }        reclaimLargeReusablePacket();        return topLevelResultSet;    }============================= protected final ResultSetImpl readResultsForQueryOrUpdate(        StatementImpl callingStatement, int maxRows, int resultSetType,        int resultSetConcurrency, boolean streamResults, String catalog,        Buffer resultPacket, boolean isBinaryEncoded, long preSentColumnCount,        Field[] metadataFromCache) throws SQLException {        long columnCount = resultPacket.readFieldLength();        if (columnCount == 0) {            return buildResultSetWithUpdates(callingStatement, resultPacket);        } else if (columnCount == Buffer.NULL_LENGTH) {            String charEncoding = null;            if (this.connection.getUseUnicode()) {                charEncoding = this.connection.getEncoding();            }            String fileName = null;            if (this.platformDbCharsetMatches) {                fileName = ((charEncoding != null)                    ? resultPacket.readString(charEncoding, getExceptionInterceptor())                    : resultPacket.readString());            } else {                fileName = resultPacket.readString();            }            return sendFileToServer(callingStatement, fileName);        } else {            //获取结果集            com.mysql.jdbc.ResultSetImpl results = getResultSet(callingStatement,                    columnCount, maxRows, resultSetType, resultSetConcurrency,                    streamResults, catalog, isBinaryEncoded,                    metadataFromCache);            return results;        }    }=========================    //对数据进行解析封装生成结果集    protected ResultSetImpl getResultSet(StatementImpl callingStatement,        long columnCount, int maxRows, int resultSetType,        int resultSetConcurrency, boolean streamResults, String catalog,        boolean isBinaryEncoded, Field[] metadataFromCache)        throws SQLException {        Buffer packet; // The packet from the server        //字段数组        Field[] fields = null;        // Read in the column information        if (metadataFromCache == null /* we want the metadata from the server */) {            fields = new Field[(int) columnCount];            for (int i = 0; i < columnCount; i++) {            Buffer fieldPacket = null;                //循环处理                fieldPacket = readPacket();                fields[i] = unpackField(fieldPacket, false);            }        } else {        for (int i = 0; i < columnCount; i++) {        skipPacket();        }        }         //        packet = reuseAndReadPacket(this.reusablePacket);                readServerStatusForResultSets(packet);//// Handle cursor-based fetch first//if (this.connection.versionMeetsMinimum(5, 0, 2)&& this.connection.getUseCursorFetch()&& isBinaryEncoded&& callingStatement != null&& callingStatement.getFetchSize() != 0&& callingStatement.getResultSetType() == ResultSet.TYPE_FORWARD_ONLY) {ServerPreparedStatement prepStmt = (com.mysql.jdbc.ServerPreparedStatement) callingStatement;boolean usingCursor = true;//// Server versions 5.0.5 or newer will only open// a cursor and set this flag if they can, otherwise// they punt and go back to mysql_store_results() behavior//if (this.connection.versionMeetsMinimum(5, 0, 5)) {usingCursor = (this.serverStatus &SERVER_STATUS_CURSOR_EXISTS) != 0;}if (usingCursor) {RowData rows = new RowDataCursor(this,prepStmt,fields);ResultSetImpl rs = buildResultSetWithRows(callingStatement,catalog,fields,rows, resultSetType, resultSetConcurrency, isBinaryEncoded);if (usingCursor) {        rs.setFetchSize(callingStatement.getFetchSize());        }return rs;}}        RowData rowData = null;        if (!streamResults) {            //封装成RowData的数据            rowData = readSingleRowSet(columnCount, maxRows,                    resultSetConcurrency, isBinaryEncoded,                    (metadataFromCache == null) ? fields : metadataFromCache);        } else {            rowData = new RowDataDynamic(this, (int) columnCount,            (metadataFromCache == null) ? fields : metadataFromCache,                    isBinaryEncoded);            this.streamingData = rowData;        }        //创建ResultSetImpl对象        ResultSetImpl rs = buildResultSetWithRows(callingStatement, catalog,        (metadataFromCache == null) ? fields : metadataFromCache,            rowData, resultSetType, resultSetConcurrency, isBinaryEncoded);        return rs;    }==================================   //对rowData的封装   private RowData readSingleRowSet(long columnCount, int maxRows,        int resultSetConcurrency, boolean isBinaryEncoded, Field[] fields)        throws SQLException {        RowData rowData;        ArrayList rows = new ArrayList();        boolean useBufferRowExplicit = useBufferRowExplicit(fields);        // Now read the data        //读取数据        ResultSetRow row = nextRow(fields, (int) columnCount, isBinaryEncoded,                resultSetConcurrency, false, useBufferRowExplicit, false, null);        int rowCount = 0;        if (row != null) {            rows.add(row);            rowCount = 1;        }        while (row != null) {                //读取全部数据,封装至list        row = nextRow(fields, (int) columnCount, isBinaryEncoded,                    resultSetConcurrency, false, useBufferRowExplicit, false, null);            if (row != null) {            if ((maxRows == -1) || (rowCount < maxRows)) {            rows.add(row);            rowCount++;            }            }        }        rowData = new RowDataStatic(rows);        return rowData;    }=============================   //创建ResultSetImpl对象    private com.mysql.jdbc.ResultSetImpl buildResultSetWithRows(        StatementImpl callingStatement, String catalog,        com.mysql.jdbc.Field[] fields, RowData rows, int resultSetType,        int resultSetConcurrency, boolean isBinaryEncoded)        throws SQLException {        ResultSetImpl rs = null;         //根据所传的resultSet常量参数生成相应模式的ResultSetImpl        switch (resultSetConcurrency) {        case java.sql.ResultSet.CONCUR_READ_ONLY:            rs = com.mysql.jdbc.ResultSetImpl.getInstance(catalog, fields, rows,                    this.connection, callingStatement, false);            if (isBinaryEncoded) {                rs.setBinaryEncoded();            }            break;        case java.sql.ResultSet.CONCUR_UPDATABLE:            rs = com.mysql.jdbc.ResultSetImpl.getInstance(catalog, fields, rows,                    this.connection, callingStatement, true);            break;        default:            return com.mysql.jdbc.ResultSetImpl.getInstance(catalog, fields, rows,                this.connection, callingStatement, false);        }        rs.setResultSetType(resultSetType);        rs.setResultSetConcurrency(resultSetConcurrency);        return rs;    }
?

?

?

?

?

?

?

?

?

?

热点排行