Author: chirino
Date: 2005-12-01 09:44:12 -0500 (Thu, 01 Dec 2005)
New Revision: 144
Modified:
trunk/activeio/src/java/org/activeio/journal/active/LogFileManager.java
trunk/activeio/src/java/org/activeio/journal/active/RecordInfo.java
trunk/activeio/src/test/org/activeio/journal/active/JournalImplTest.java
Log:
- The journal can now read records that have been moved to archived log files.
Modified: trunk/activeio/src/java/org/activeio/journal/active/LogFileManager.java
===================================================================
--- trunk/activeio/src/java/org/activeio/journal/active/LogFileManager.java 2005-11-19 20:23:52 UTC (rev 143)
+++ trunk/activeio/src/java/org/activeio/journal/active/LogFileManager.java 2005-12-01 14:44:12 UTC (rev 144)
@@ -25,6 +25,7 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.text.NumberFormat;
+import java.util.HashMap;
import org.activeio.Packet;
import org.activeio.adapter.PacketOutputStream;
@@ -88,6 +89,7 @@
private boolean loadedFromCleanShutDown;
private File archiveDirectory;
+ HashMap openArchivedLogs = new HashMap();
public LogFileManager(File logDirectory) throws IOException {
this(logDirectory, DEFAULT_LOGFILE_COUNT, DEFAULT_LOGFILE_SIZE, null);
@@ -323,7 +325,7 @@
if (firstActiveNode.getId() < lastMark.getLogFileId()) {
if( archiveDirectory!=null ) {
- File file = new File(archiveDirectory, "" + archiveLogNameFormat.format(firstActiveNode.getId()) + ".log");
+ File file = getArchiveFile(firstActiveNode.getId());
firstActiveNode.getLogFile().copyTo(file);
}
@@ -338,27 +340,55 @@
}
}
}
-
+
+ private File getArchiveFile(int logId) {
+ return new File(archiveDirectory, "" + archiveLogNameFormat.format(logId) + ".log");
+ }
+
RecordInfo readRecordInfo(Location location) throws IOException, InvalidRecordLocationException {
+ LogFile logFile;
LogFileNode logFileState = getLogFileWithId(location.getLogFileId());
- // There can be no record at the append offset.
- if (logFileState.getAppendOffset() == location.getLogFileOffset()) {
- throw new InvalidRecordLocationException("No record at (" + location
- + ") found. Location past end of logged data.");
+ if( logFileState !=null ) {
+ // There can be no record at the append offset.
+ if (logFileState.getAppendOffset() == location.getLogFileOffset()) {
+ throw new InvalidRecordLocationException("No record at (" + location
+ + ") found. Location past end of logged data.");
+ }
+ logFile = logFileState.getLogFile();
+ } else {
+ if( archiveDirectory==null ) {
+ throw new InvalidRecordLocationException("Log file: " + location.getLogFileId() + " is not active.");
+ } else {
+ logFile = getArchivedLogFile(location.getLogFileId());
+ }
}
// Is there a record header at the seeked location?
try {
- LogFile logFile = logFileState.getLogFile();
Record header = new Record();
logFile.readRecordHeader(location.getLogFileOffset(), header);
- return new RecordInfo(location, header, logFileState);
+ return new RecordInfo(location, header, logFileState, logFile);
} catch (IOException e) {
throw new InvalidRecordLocationException("No record at (" + location + ") found.");
}
}
+ private LogFile getArchivedLogFile(int logFileId) throws InvalidRecordLocationException, IOException {
+ Integer key = new Integer(logFileId);
+ LogFile rc = (LogFile) openArchivedLogs.get(key);
+ if( rc == null ) {
+ File archiveFile = getArchiveFile(logFileId);
+ if( !archiveFile.canRead() )
+ throw new InvalidRecordLocationException("Log file: " + logFileId + " does not exist.");
+ rc = new LogFile(archiveFile, getInitialLogFileSize());
+ openArchivedLogs.put(key, rc);
+
+ // TODO: turn openArchivedLogs into LRU cache and close old log files.
+ }
+ return rc;
+ }
+
LogFileNode getLogFileWithId(int logFileId) throws InvalidRecordLocationException {
for (LogFileNode lf = firstActiveNode; lf != null; lf = lf.getNextActive()) {
if (lf.getId() == logFileId) {
@@ -369,7 +399,7 @@
if (logFileId < lf.getId())
break;
}
- throw new InvalidRecordLocationException("Log file with id;" + logFileId + " is not active.");
+ return null;
}
/**
@@ -421,7 +451,7 @@
byte data[] = new byte[recordInfo.getHeader().getPayloadLength()];
- LogFile logFile = recordInfo.getLogFileState().getLogFile();
+ LogFile logFile = recordInfo.getLogFile();
logFile.read(recordInfo.getDataOffset(), data);
return new ByteArrayPacket(data);
Modified: trunk/activeio/src/java/org/activeio/journal/active/RecordInfo.java
===================================================================
--- trunk/activeio/src/java/org/activeio/journal/active/RecordInfo.java 2005-11-19 20:23:52 UTC (rev 143)
+++ trunk/activeio/src/java/org/activeio/journal/active/RecordInfo.java 2005-12-01 14:44:12 UTC (rev 144)
@@ -25,11 +25,13 @@
private final Location location;
private final Record header;
private final LogFileNode logFileState;
+ private final LogFile logFile;
- public RecordInfo(Location location, Record header, LogFileNode logFileState) {
+ public RecordInfo(Location location, Record header, LogFileNode logFileState, LogFile logFile) {
this.location = location;
this.header = header;
this.logFileState = logFileState;
+ this.logFile = logFile;
}
int getNextLocation() {
@@ -48,6 +50,10 @@
return logFileState;
}
+ public LogFile getLogFile() {
+ return logFile;
+ }
+
public int getDataOffset() {
return location.getLogFileOffset() + Record.RECORD_HEADER_SIZE;
}
Modified: trunk/activeio/src/test/org/activeio/journal/active/JournalImplTest.java
===================================================================
--- trunk/activeio/src/test/org/activeio/journal/active/JournalImplTest.java 2005-11-19 20:23:52 UTC (rev 143)
+++ trunk/activeio/src/test/org/activeio/journal/active/JournalImplTest.java 2005-12-01 14:44:12 UTC (rev 144)
@@ -39,8 +39,8 @@
Log log = LogFactory.getLog(JournalImplTest.class);
- int size = 1024*512;
- int logFileCount=4;
+ int size = 1024*10;
+ int logFileCount=2;
File logDirectory = new File("test-logfile");
private Journal journal;
@@ -52,7 +52,7 @@
deleteDir(logDirectory);
}
assertTrue("Could not delete directory: "+logDirectory.getCanonicalPath(), !logDirectory.exists() );
- journal = new JournalImpl(logDirectory,logFileCount,size);
+ journal = new JournalImpl(logDirectory,logFileCount, size, logDirectory);
}
/**
@@ -118,6 +118,27 @@
log.info(journal);
}
+ public void testCanReadFromArchivedLogFile() throws InvalidRecordLocationException, InterruptedException, IOException {
+
+ Packet data1 = createPacket("Hello World 1");
+ RecordLocation location1 = journal.write( data1, false);
+
+ Location pos;
+ do {
+
+ Packet p = createPacket("<<<data>>>");
+ pos = (Location) journal.write( p, false);
+ journal.setMark(pos, false);
+
+ } while( pos.getLogFileId() < 5 );
+
+ // Now see if we can read that first packet.
+ Packet data;
+ data = journal.read(location1);
+ assertEquals( data1, data);
+
+ }
+
/**
* @param string
* @return