Showing posts with label concurrency. Show all posts
Showing posts with label concurrency. Show all posts

Saturday, October 17, 2009

Collections.synchronizedXxx() methods are bad for concurrent scenarios (Java)

The java.util.Collections class provides easy factory methods for collections and maps. In this post, I am trying to highlight why the synchronizedXxx() methods are bad for concurrent scenarios. If you look at the source code of Sun JDK and IBM JDK (I haven't checked Oracle/BEA's) for these methods:


Collections.synchronizedList
Collections.synchronizedSet
Collections.synchronizedMap


you will find they essentially maintain a storage-object-level mutex -- every access obtains a lock upon that mutex to maintain thread-safety before going ahead with the operation. For concurrent scenarios, this is a disaster because you can only invoke one operation at a time.

Fortunately, there are few alternatives you can look at right inside the JDK. For maps, you can use:


java.util.concurrent.ConcurrentHashMap


and for lists or sets, you can look at these:


java.util.concurrent.CopyOnWriteArrayList
java.util.concurrent.CopyOnWriteArraySet


Java 6 users can also use these (Note: they are not O(1), but rather O(log(n)) operations):


java.util.concurrent.ConcurrentSkipListMap
java.util.concurrent.ConcurrentSkipListSet


However, there are few points worth knowing:

1. CopyOnWriteXxx collections are suited only for those scenarios where the read operations hugely outnumber the write operations.

2. Concurrent scenarios are better dealt with a strategy at application architecture level rather than brute force use of concurrency-optimized collections.

3. For concurrent scenarios, isolate operations with lifecycle-managed threads each dealing with immutable objects and/or small concurrent datasets rather than giant thread-safe ones. Minimize obtaining of locks rather than making everything thread-safe.

Please let me know what you think.

Wednesday, October 7, 2009

Thread-safe Time vs Frequency keeper (Java)

This post is to discuss the approach for counting operation frequency across time units. For example:

Question: How many stock prices were updated in the last 10 seconds (per second count)?

Answer: [6, 3, 0, 5, 2, 1, 4, 0, 9]

After 1 second: [4, 6, 3, 0, 5, 2, 1, 4, 0]
After 2 second: [7, 4, 6, 3, 0, 5, 2, 1, 4]

The answer varies depending on when the question is asked. To answer such questions at any point of time, we need a collector that would record the operation frequency per time unit, and slide the counter windows at the end of each time unit.

One solution is shown below: TimeVersusCountKeeper -- compile and run the class to see it in action.

Update: I have re-posted the code without line numbers. I am using this code in a highly multi-threaded message queuing (multiple queues) scenario, and it works fantastically -- there is one counter per queue and I display the live traffic for about 15-16 queues on a webpage that refreshes every second. But again, this may not be a use-case for every need.

import java.util.Arrays;
import java.util.Formatter;

/**
* Keeps count across time slabs.
*/

public class TimeVersusCountKeeper {

private final long[] UNITS;
private final long UNIT_DURATION_MILLIS;
private final String UNIT_DURATION_NAME;

private volatile long till = System.currentTimeMillis();
private final String TILL_LOCK = new String("LOCK");

private String getDurationName(final long unitDurationMillis) {
final int unitCount = UNITS.length;
final String s;
String unitName;
if (unitCount <= 1) {
s = "";
unitName = "unit of duration " + UNIT_DURATION_MILLIS + "ms";
} else {
s = "s";
unitName = "unit(s) of duration " + UNIT_DURATION_MILLIS + "ms each";
}
if (UNIT_DURATION_MILLIS == 1000L) { unitName = "sec"; }
else if (UNIT_DURATION_MILLIS == 60 * 1000L) { unitName = "min"; }
else if (UNIT_DURATION_MILLIS == 60 * 60 * 1000L) { unitName = "hr"; }
else if (UNIT_DURATION_MILLIS == 24 * 60 * 60 * 1000L) { unitName = "day" + s; }
else if (UNIT_DURATION_MILLIS == 7 * 24 * 60 * 60 * 1000L) { unitName = "week" + s; }
return unitName;
}

private synchronized void update() {
long diff = 0;
synchronized (TILL_LOCK) {
diff = System.currentTimeMillis() - till;
}

if (diff > UNIT_DURATION_MILLIS) {
final long TRUNCATE_UNITS_COUNT = diff / UNIT_DURATION_MILLIS;
synchronized (UNITS) {
if (TRUNCATE_UNITS_COUNT >= UNITS.length) {
Arrays.fill(UNITS, 0L);
} else {
System.arraycopy(UNITS, 0, UNITS, (int) TRUNCATE_UNITS_COUNT,
UNITS.length - (int) TRUNCATE_UNITS_COUNT);
for (int i = 0; i < TRUNCATE_UNITS_COUNT; i++) {
UNITS[i] = 0;
}
}
}
synchronized (TILL_LOCK) {
till += TRUNCATE_UNITS_COUNT * UNIT_DURATION_MILLIS;
//till = System.currentTimeMillis();
}
}
}

public TimeVersusCountKeeper() {
UNITS = new long[10]; // last 10
UNIT_DURATION_MILLIS = 1000L; // seconds
UNIT_DURATION_NAME = getDurationName(UNIT_DURATION_MILLIS);
}

public TimeVersusCountKeeper(final int maxUnitsCount,
final long unitLengthMillis) {
this.UNITS = new long[maxUnitsCount];
this.UNIT_DURATION_MILLIS = unitLengthMillis;
UNIT_DURATION_NAME = getDurationName(UNIT_DURATION_MILLIS);
}

public void incrementBy(final long count) {
update();
synchronized (UNITS) {
UNITS[0] += count;
}
}

public void setCount(final long count) {
update();
synchronized (UNITS) {
UNITS[0] = count;
}
}

public long[] getElements() {
update();
synchronized (UNITS) {
return UNITS.clone();
}
}

@Override
public String toString() {
int unitsCount = 0;
synchronized (UNITS) {
unitsCount = UNITS.length;
}
return new StringBuilder()
.append(unitsCount)
.append(' ')
.append(UNIT_DURATION_NAME)
.append(": ")
.append(getElementsAsString(1) /*Arrays.toString(units)*/)
.toString();
}

public String getElementsAsString(final int minElementWidth) {
final StringBuilder sb = new StringBuilder();
final Formatter numFormatter = new Formatter(sb);
final String format = "%" + minElementWidth + "d";
final long[] array = getElements();
long totalSum = 0;
String delim = "";
sb.append('[');
for (int i = 0; array != null && i < array.length; i++) {
numFormatter.format(delim + format, array[i]);
totalSum += array[i];
delim = ", ";
}
sb.append("](Avg:");
numFormatter.format(format, (long) (totalSum / array.length));
sb.append(')');
return sb.toString();
}

public static void main(String[] args) {
final TimeVersusCountKeeper keeper = new TimeVersusCountKeeper();
for (int i = 0; i < 100; i++) {
try {
Thread.sleep(500);
} catch (InterruptedException e) {}
keeper.incrementBy(i);
for (int j = 0; j < 100; j++) {
keeper.incrementBy(1);
}
System.out.println(keeper.toString());
}
}

}

Disqus for Char Sequence