public abstract class FileResourceConsumer extends java.lang.Object implements java.util.concurrent.Callable<IFileProcessorFutureResult>
| Modifier and Type | Class and Description |
|---|---|
private static class |
FileResourceConsumer.STATE |
| Modifier and Type | Field and Description |
|---|---|
private int |
consumerId |
private FileStarted |
currentFile |
private FileResourceConsumer.STATE |
currentState |
static java.lang.String |
ELAPSED_MILLIS |
private java.util.concurrent.ArrayBlockingQueue<FileResource> |
fileQueue |
static java.lang.String |
IO_IS |
static java.lang.String |
IO_OS |
private java.lang.Object |
lock |
protected static org.slf4j.Logger |
LOG |
private long |
maxConsecWaitInMillis |
private static java.util.concurrent.atomic.AtomicInteger |
numConsumers |
private int |
numHandledExceptions |
private int |
numResourcesConsumed |
static java.lang.String |
OOM |
static java.lang.String |
PARSE_ERR |
static java.lang.String |
PARSE_EX |
static java.lang.String |
TIMED_OUT |
| Constructor and Description |
|---|
FileResourceConsumer(java.util.concurrent.ArrayBlockingQueue<FileResource> fileQueue) |
| Modifier and Type | Method and Description |
|---|---|
private boolean |
_processFileResource(FileResource fileResource) |
IFileProcessorFutureResult |
call() |
FileStarted |
checkForTimedOutMillis(long staleThresholdMillis)
Checks to see if the currentFile being processed (if there is one)
should be timed out (still being worked on after staleThresholdMillis).
|
protected void |
close(java.io.Closeable closeable) |
protected void |
flushAndClose(java.io.Closeable closeable) |
FileStarted |
getCurrentFile()
Returns the name and start time of a file that is currently being processed.
|
private FileResource |
getNextFileResource() |
int |
getNumHandledExceptions() |
int |
getNumResourcesConsumed() |
protected java.lang.String |
getXMLifiedLogMsg(java.lang.String type,
java.lang.String resourceId,
java.lang.String... attrs) |
protected java.lang.String |
getXMLifiedLogMsg(java.lang.String type,
java.lang.String resourceId,
java.lang.Throwable t,
java.lang.String... attrs)
Use this for structured output that captures resourceId and other attributes.
|
protected void |
incrementHandledExceptions()
Make sure to call this appropriately!
|
boolean |
isStillActive()
Returns whether or not the consumer is still could process
a file or is still processing a file (ACTIVELY_CONSUMING or ASKED_TO_SHUTDOWN)
|
protected void |
parse(java.lang.String resourceId,
Parser parser,
java.io.InputStream is,
org.xml.sax.ContentHandler handler,
Metadata m,
ParseContext parseContext)
Utility method to handle logging equivalently among all
implementing classes.
|
void |
pleaseShutdown()
This politely asks the consumer to shutdown.
|
abstract boolean |
processFileResource(FileResource fileResource)
Main piece of code that needs to be implemented.
|
private void |
setEndedState(FileResourceConsumer.STATE cause) |
protected static final org.slf4j.Logger LOG
public static java.lang.String TIMED_OUT
public static java.lang.String OOM
public static java.lang.String IO_IS
public static java.lang.String IO_OS
public static java.lang.String PARSE_ERR
public static java.lang.String PARSE_EX
public static java.lang.String ELAPSED_MILLIS
private static java.util.concurrent.atomic.AtomicInteger numConsumers
private long maxConsecWaitInMillis
private final java.util.concurrent.ArrayBlockingQueue<FileResource> fileQueue
private final int consumerId
private final java.lang.Object lock
private FileStarted currentFile
private volatile int numResourcesConsumed
private volatile int numHandledExceptions
private volatile FileResourceConsumer.STATE currentState
public FileResourceConsumer(java.util.concurrent.ArrayBlockingQueue<FileResource> fileQueue)
public IFileProcessorFutureResult call()
call in interface java.util.concurrent.Callable<IFileProcessorFutureResult>public abstract boolean processFileResource(FileResource fileResource)
incrementHandledExceptions() appropriately in
your implementation of this method.
fileResource - resource to processprotected void incrementHandledExceptions()
public boolean isStillActive()
private boolean _processFileResource(FileResource fileResource)
public void pleaseShutdown()
This offers another method for politely requesting
that a FileResourceConsumer stop processing
besides passing it PoisonFileResource.
public FileStarted getCurrentFile()
public int getNumResourcesConsumed()
public int getNumHandledExceptions()
public FileStarted checkForTimedOutMillis(long staleThresholdMillis)
If the consumer should be timed out, this will return the currentFile and set the state to TIMED_OUT.
If the consumer was already timed out earlier or is not processing a file or has been working on a file for less than #staleThresholdMillis, then this will return null.
staleThresholdMillis - threshold to determine whether the consumer has gone stale.protected java.lang.String getXMLifiedLogMsg(java.lang.String type,
java.lang.String resourceId,
java.lang.String... attrs)
protected java.lang.String getXMLifiedLogMsg(java.lang.String type,
java.lang.String resourceId,
java.lang.Throwable t,
java.lang.String... attrs)
type - entity name for exceptionresourceId - resourceId stringt - throwable can be nullattrs - (array of key0, value0, key1, value1, etc.)private FileResource getNextFileResource() throws java.lang.InterruptedException
java.lang.InterruptedExceptionprotected void close(java.io.Closeable closeable)
protected void flushAndClose(java.io.Closeable closeable)
private void setEndedState(FileResourceConsumer.STATE cause)
protected void parse(java.lang.String resourceId,
Parser parser,
java.io.InputStream is,
org.xml.sax.ContentHandler handler,
Metadata m,
ParseContext parseContext)
throws java.lang.Throwable
resourceId - resourceIdparser - parser to useis - inputStream (will be closed by this method!)handler - handler for the contentm - metadataparseContext - parse contextjava.lang.Throwable - (logs and then throws whatever was thrown (if anything)