Mike Schaeffer's Blog

Articles with tag: ksm
July 27, 2013

The other day, our team ran across an interesting design problem related to Java synchronization. Without getting into the details of why this problem came up, the gist of it was that we had to somehow to write a synchronization wrapper around OutputStream. (Well, technically, it was a BufferedWriter, but the issue is the same.) This wrapper needed to correctly allows multiple unsynchronized writer threads to write to an underlying writer, each thread atomically writing lines of text. We wound up managing to avoid having to do this by changing our interface, but the solution to the OutputStream problem still provides an interesting look at a lesser-known aspect of the Java Runtime: ThreadLocal variables.

To illustrate the problem, consider this simpler version of the OutputStream interface. (Unlike the full version, this interface eliminates the two bulk-write forms of the write method, which don’t matter for this conversation.)

interface SimpleOutputStream {
    void write(int byte);
}

The simplest way to writing a synchronized wrapper for this interface is to wrap each call to the underlying implementation method in a synchronized block. This wrapper then guarantees that each thread entering write will have exclusive and atomic access to write its byte to the underlying implementation.

public void write(int byte)
{
    synchronized(underlying)
    {
        underlying.write(byte);
    }
}

The difficulty with this approach is that the locking is too fine grained to produce a coherent stream of output bytes on the underlying writer. A context switch in the middle of a line of output text will cause that line to contain bytes from two source threads, with no way to distinguish one thread’s data from another. The only thing the locking has provided is a guarantee that we won’t have multiple threads reentrantly calling into the underlying stream. What we need is a way for our wrapper to buffer the lines of text on a per-thread basis, and then write full lines within a synchronized block, and this is where ThreadLocal comes into play.

An instance of ThreadLocal is exactly what it sounds like: a container for a value that is local to a thread. Each thread that gets a value from an instance of a ThreadLocal gets its own, unique copy of that value. Ignoring how exactly it might work, this is the abstraction that will enable our implementation of OutputStream to buffer lines of text from each writer thread, prior to writing them out. The key is is in the specific use of the thread local.

ThreadLocal threadBuf = new ThreadLocal() {
    protected StringBuffer initialValue() {
        return new StringBuffer();
    }
};

This code allocates an instance of a local derivation of ThreadLocal specialized to hold a StringBuffer. The overrided initialValue method determines the initial value of the ThreadLocal for each thread that retrieves a value – it is called once for each thread. In this specific case, it allocates a new StringBuffer for each thread that requests a value from threadBuf. This provides us with a line buffer per thread. From here on out, the implementation is largely connecting the dots.

First, we need an implementation of write that buffers into threadBuf:

public void write(int b)
      throws IOException
{
    threadBuf.get().append((char)b);
 
    if (b == (int)'\n')
        flush();
}

The get method pulls the thread local buffer out of threadBuf. Note that because this buffer is thread local, it is not shared, so we don’t need to synchronize on it.

Second, we need an implementation of flush that atomically writes the current thread line buffer to the underlying output:

protected void flush()
    throws IOException
{
    synchronized(underlying) {
        underlying.write(threadBuf.get().toString().getBytes());
 
        threadBuf.get().setLength(0);
    }
}

Here we do need synchronization, because underlying is shared between threads, even if the threadBuf is not.

While there are still a few implementation details to worry about, this provides the bulk of the functionality we were originally looking for. Multiple threads can write into an output stream wrapped in this way, and the output will be synchronized on a per-line basis. If you’d like to experiment with the idea a bit, or read through an implementation, I’ve put code on github. This sample project contains a synchronized output stream, as well as a test program that launches multiple threads, all writing to System.out, via our synchronized. A command line flag lets you turn the wrapper off to see the ill effects of finer grained synchronization.

Tags:javaksm
July 22, 2013

One of the best things about writing code in the Java ecosystem is that so much of the underlying platform is open source. This makes it easy to get good answers to questions about how the platform actually works. To illustrate, I’ll walk through the JVM code to show why Java I/O isn’t interruptable. This will explain why threads performing Java I/O can’t be interrupted.

The core issue with interrupting a Java thread performing I/O is that the underlying system call is uninterruptable. Let’s see why that is, for a FileInputStream.

The start of the call stack is in the Java standard class library. In a traditional source tree, this is located under: ${JDK_SRC_ROOT}/jdk/src/share/classes/. In keeping with Java tradition, the class library code is structured with a directory per package. Looking at the code for FileInputStream, both bulk read operations delegate to a method readBytes:

public int read(byte b[]) throws IOException {
    return readBytes(b, 0, b.length);
}
// ...
public int read(byte b[], int off, int len) throws IOException {
    return readBytes(b, off, len);
}

readBytes, however, is declared to be native:

private native int readBytes(byte b[], int off, int len) throws IOException;

The native code for the class library is located in a parallel directory structure. ${JDK_SRC_ROOT}/jdk/src/share/native/.... Like the Java code, this native code is structured with a file-per-class and directory-per package. Looking at this code, you can see that the function name is structured to include the java class name, and the prototype contains extra parameters that the JVM uses to manage internal state. env stores a pointer to the current thread’s JVM environment, and this is an explicit declaration of the usual implicit Java this argument. The remaining three arguments are the arguments declared in the original source file.

JNIEXPORT jint JNICALL
Java_java_io_FileInputStream_readBytes(JNIEnv *env, jobject this,
    jbyteArray bytes, jint off, jint len) {
    return readBytes(env, this, bytes, off, len, fis_fd);
}

At this point, the class library is calling into another layer of code, outside the class library. Most of the Java policy surrounding File I/O is implemented in readBytes. (Note particularly the call to malloc in line 23…. traditional File I/O in Java is implemented by reading into a local buffer, and then copying from the local buffer into the Java byte[] initially passed into int read(byte buf[]). This double-copy is slow, but it also requires heap allocation of a second read buffer, if the read buffer is large than 8K. The last time I was reading this code, it was to diagnose an OutOfMemory caused by an over-large buffer size.)

jint
readBytes(JNIEnv *env, jobject this, jbyteArray bytes,
          jint off, jint len, jfieldID fid)
{
    jint nread;
    char stackBuf[BUF_SIZE];
    char *buf = NULL;
    FD fd;
 
    if (IS_NULL(bytes)) {
        JNU_ThrowNullPointerException(env, NULL);
        return -1;
    }
 
    if (outOfBounds(env, off, len, bytes)) {
        JNU_ThrowByName(env, "java/lang/IndexOutOfBoundsException", NULL);
        return -1;
    }
 
    if (len == 0) {
        return 0;
    } else if (len > BUF_SIZE) {
        buf = malloc(len);
        if (buf == NULL) {
            JNU_ThrowOutOfMemoryError(env, NULL);
            return 0;
        }
    } else {
        buf = stackBuf;
    }
 
    fd = GET_FD(this, fid);
    if (fd == -1) {
        JNU_ThrowIOException(env, "Stream Closed");
        nread = -1;
    } else {
        nread = IO_Read(fd, buf, len);
        if (nread > 0) {
            (*env)->SetByteArrayRegion(env, bytes, off, nread, (jbyte *)buf);
        } else if (nread == JVM_IO_ERR) {
            JNU_ThrowIOExceptionWithLastError(env, "Read error");
        } else if (nread == JVM_IO_INTR) {
            JNU_ThrowByName(env, "java/io/InterruptedIOException", NULL);
        } else { /* EOF */
            nread = -1;
        }
    }
 
    if (buf != stackBuf) {
        free(buf);
    }
    return nread;
}

The meat of the read is done by the IO_Read in line 37. This is aliased with a preprocessor definition to JVM_Read, which is a JVM primitive operation. JVM primitives are outside the Java class library, and are in the HotSpot JVM itself. This particular primitive is defined in ${JDK_SRC_ROOT}/hotspot/src/share/vm/prims/jvm.cpp. (In case you’re wondering how I’ve been finding these functions outside of the class library, I usually use a code text search facility.)

JVM_LEAF(jint, JVM_Read(jint fd, char *buf, jint nbytes))
  JVMWrapper2("JVM_Read (0x%x)", fd);
 
  //%note jvm_r6
  return (jint)os::restartable_read(fd, buf, nbytes);
JVM_END

The Java read operation is the point where the code path goes from code common to all JVM platforms and into OS specific code. For the Solaris (Unix) code path, the definition looks like this.

size_t os::restartable_read(int fd, void *buf, unsigned int nBytes) {
  INTERRUPTIBLE_RETURN_INT(::read(fd, buf, nBytes), os::Solaris::clear_interrupted);
}

Line 2 of this code, finally is the OS system call itself: ::read. However, it’s wrapped in a macro call to INTERRUPTIBLE_RETURN_INT. This macro turns out to be a standard retry loop for a Unix system call.

#define INTERRUPTIBLE_RETURN_INT(_cmd, _clear) do { \
  int _result; \
  do { \
    INTERRUPTIBLE(_cmd, _result, _clear); \
  } while((_result == OS_ERR) && (errno == EINTR)); \
  return _result; \
} while(false)

This macro expansion turns into a loop that will repeatedly issue the system call as long as it returns EINTR - the return code for an interrupted system call. As long as the system call doesn’t outright fail, the JVM will keep retrying the read if it’s interrupted. To get interruptable I/O semantics, you have to call the OS differently.

Slava Pestov has written a nice piece on how EINTR is used by Unix.

Unix’s use of EINTR is one of the original aspects of Unix that led to ‘worse is better’. Other contemporary operating systems of the time went to greater lengths to handle long running system calls. ITS would interrupt the system call, and then arrange for it to be restarted after the interruption, but with parameters that allow it to pick up where it left off.

See Also: The original paper on ITS system call restarts.

Tags:javaksm
July 2, 2013

I recently blogged about solving problems with thread local storage. In that instance, thread local storage was a way to add a form of ‘after the fact’ synchronization to an interface that was initially designed to be called from a single thread. Thread local storage is useful for this purpose, because it allows each thread to easily isolate the data it manipulates from other threads. Unfortunately, while this is the strength of thread local storage, it is also the weakness. In modern multi-threaded designs, threading issues are often abstracted behind thread pools and work queues. With these abstractions at work, the threads themselves become an implementation detail, and thread local variables are too low level to serve some useful scenarios.

One way to address this issue is to use some of the techniques from an earlier post on dynamic extent. The general gist of the idea is to provide Runnable‘s with way of reconstructing relevant thread local state at they time they are invoked on a pool thread. This maps well to the idea of ‘dynamic extent’, as presented in the earlier post. ‘Establishing the precondition’ is initializing the thread locals for the run, and ‘Establishing the post condition’ is restoring their original values. Here’s how it might look in code:

// This is the thread local storage that we want to migrate to worker threads.
ThreadLocal<Object> tls = new ThreadLocal<Object>();
 
Runnable bindToTls(final Runnable runnable)
{
   return new Runnable() {
      // Captured at the time/thread making the initial call to bindToTls
      final Object boundTls = tls.get();
 
      public void run() {
         // Captured at the time the Runnable is invoked
         Object initialTls = tls.get();
 
         try {
            tls.set(boundTls);
 
            runnable.run();
         } finally {
            tls.set(initialTls);
         }
   };
}

Calling bindToTls on a Runnable then gives a new instance of Runnable that remembers the preserved thread local state at the time of the call to bindToTls. The returned Runnable can then be provided to a thread pool, and when it is run, it will remember the thread local state. At the end of the run, it then ensures that the thread local is restored to its original value. (Which eliminates the possibility of certain classes of errors, including potential memory leaks.)

One caveat to this version of bindToTls is that is only migrates the one thread local variable that it’s been written to migrate. While this could be made considerably more general, my suggestion is that it’s usually unnecessary to do so. My current project uses one instance of bindToTls to provide a custom Spring scope. From there, Spring provides all the generality we need, and the lower level thread management code is kept contained, which is as it should be.

Tags:javaksm