No F*cking Idea

Common answer to everything

java.util.concurrent Goodies

| Comments

JVM as a virtual machine is great. I’m not in love with java as i think too many {} can kill anyone regardless of faith. It was really really long time since i wrote anything for the blog and today as it is “summer bank holiday” i decided to finally sit down and write few things that are interesting.

java.util.concurrent

Even if i will be talking about java.util.concurrent i will give few examples using Scala. I like Scala and i think it is much easier to understand and read than java. Simply less tokens. And less tokens mean more fun.

I don’t know if explore how much nice things there is in JVM, one of thme is truffle (added in java 8 but i will not be talking about it.). One of the great things is called java.util.concurrent this set of tools/lib gives us tools to work with concurrency.

In times of agents such a set of tools could feel a bit outdated but still they can gives us valuable lessons about concurrency and maybe be useful in present/future.

Abstracts

So as we all know java is full of design patterns and one of the first things you will not while looking into docs are abstract classes. This kinda gives us overview of what we can expect in the package. Just like a movie teaser but… boring :). First thing we notice while looking at http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/package-summary.html is probably BlockingDeque and BlockingQueue. And this is out first example.

ArrayBlockingQueue

If you ever worked with threads or any concurrent constructs you know how useful are channels/queues. First concrete class in the package is ArrayBlockingQueue[T] which lets us construct queues. For those who don’t know what queue is, it is a FIFO construct, FIFO means First In, First Out. So elements that get in first will be picked up at the receiving end of queue before rest. It is like a queue for tickets before a big summer blockbuster release.

Let us try this ArrayBlockingQueue out:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import java.util.concurrent._
import scala.util.control.Breaks._

object Example {
  val queue = new ArrayBlockingQueue[Int](100)


  val producer1 = new Thread(new Runnable {
    def run() {
      (0 to 1000).map( n => {
        while(!queue.offer(n)){}
      })
    }
  })

  val consumer1 = new Thread(new Runnable {
    def run() {
      breakable {
        while(true){
          val result = queue.take()
          print(result.toString() ++ ",")
          if (result > 999){
            break
          }
        }
      }
    }
  })

  def main(args: Array[String]): Unit = {
    consumer1.start
    producer1.start
  }

}

What are we doing here ? We simply demonstrate a producer and consumer type of situation.

There are few things to look at, first initialization

1
val queue = new ArrayBlockingQueue[Int](100)

where we create our queue with a total capacity of 100, this can be skipped for no capacity but this could be risky in terms of memory. So we wanna omit unpredictable parts of code.

How to add stuff to the queue

1
  while(!queue.offer(n)){}

Why like this ? lets not forget it is a blocking queue so once it will fill the capacity it will block. If the queue is full offer method will return false and the element will not be added tot he queue thats why we have to retry this. Of course in this case it might not be the perfect example as it will grind CPU until it can add it to the queue. So maybe adding Thread.sleep(50) sleep for 50 miliseconds could be good here.

Now lets look at consumer, here the job is simple we use take, this will simply block if we can get anything from queue and wait. In most cases this is the behavior we want. Thread simply sitting there and waiting for something to appear in the queue.

There is also option to use add function to add stuff to the queue but this will trigger an exception in case queue is full and i’m not a big fan of handling exceptions in this type of scenarios.

More info about ArrayblockingQueue api can be found here http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ArrayBlockingQueue.html

ConcurrentHashMap[K,V]

Concurrent Hash Map lets you use a single dictionary / hash by many threads. This is great as it makes all the synchronization work for us. Of course often writes/updates by many threads will make it perform very very slow, but if we can eg. use it as a form of reduction result that would be great simplification.

If we will use it like this

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import java.util.concurrent._

case class Mapper(key: String, times: Int, hash: ConcurrentHashMap[String, Int]) extends Runnable {
  def run(){
    val sum = (1 to times).sum
    hash.put(key, sum + hash.get(key))
  }
}

object Example {

  val resultHash = new ConcurrentHashMap[String, Int]()

  def main(args: Array[String]) : Unit = {

    val threads = Array(
      new Thread(Mapper("one", 1000, resultHash)),
      new Thread(Mapper("two", 1000, resultHash)),
      new Thread(Mapper("one", 1000, resultHash))
    )

    threads.map(_.start)
    threads.map(_.join)

    print("Key 'one' => " + resultHash.get("one").toString + "\n")
    print("Key 'two' => " + resultHash.get("two").toString + "\n")

  }

}

Of course it will work but it will often cause troubles, this code is racy :D and often it will end up with same results for both one and two even if it is synced. Well we now know we can use this structure from any number of threads but to make it work it would be more useful to create another thread that would be reducing values or simply have a queue where we put in partial results and a single thread that is updating the hash. Still this can have some use, if you have a one reducer that is updating this hash or simply many different reducers updating dedicated key spaces while many other threads are simply using this hash in readonly mode. The big issue is when you want to update it as it doesn’t support transactions and what you really want to do here is a transaction.

Atomic variables

Well we all love simplicity of a single variable, and in concurrent env it is simply easy to forget about goodies of sequential world and use a raw variable to store results of some execution.

Let us write some dodgy code:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import java.util.concurrent._

case class Counter() extends Runnable {

  def run() {
    (1 until 1000).map( n => Example.counter = Example.counter + n)
  }

}

object Example {

  var counter: Int = 0

  def main(args: Array[String]) : Unit = {

    val t = new Thread(Counter())
    val t2 = new Thread(Counter())
    t.start
    t2.start
    t.join
    t2.join

    print(counter)
  }
}

Result should be 999000 but… you will get stuff like 907369… This happens because of both threads randomly reading and updating with garbage same val. Thats why we need atomic values :) lets convert it into less dodgy thing.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import java.util.concurrent.atomic._

case class Counter() extends Runnable {

  def run() {
    (1 until 1000).map( n => Example.counter.addAndGet(n))
  }

}

object Example {

  var counter: AtomicInteger = new AtomicInteger()

  def main(args: Array[String]) : Unit = {

    val t = new Thread(Counter())
    val t2 = new Thread(Counter())
    t.start
    t2.start
    t.join
    t2.join

    print(counter.get())
  }
}

After adding AtomicInteger and changing how we update to atomic updates we always get same results and it is the correct answer. It doesn’t look good yet because of this Example.counter but that is just an example.

A lot more…

There is a lot more in this awesome package to cover, i will cover one more thing next time and that are Cyclic Barriers for better synchronization of threads but for now this is it :). I hope this was a useful read. I don’t have much time to play with Scala so if something looks “too simple” :D yeah i’m not a scala expert.

Cheers!

Comments