Wednesday, October 7, 2009

Thread-safe Time vs Frequency keeper (Java)

This post is to discuss the approach for counting operation frequency across time units. For example:

Question: How many stock prices were updated in the last 10 seconds (per second count)?

Answer: [6, 3, 0, 5, 2, 1, 4, 0, 9]

After 1 second: [4, 6, 3, 0, 5, 2, 1, 4, 0]
After 2 second: [7, 4, 6, 3, 0, 5, 2, 1, 4]

The answer varies depending on when the question is asked. To answer such questions at any point of time, we need a collector that would record the operation frequency per time unit, and slide the counter windows at the end of each time unit.

One solution is shown below: TimeVersusCountKeeper -- compile and run the class to see it in action.

Update: I have re-posted the code without line numbers. I am using this code in a highly multi-threaded message queuing (multiple queues) scenario, and it works fantastically -- there is one counter per queue and I display the live traffic for about 15-16 queues on a webpage that refreshes every second. But again, this may not be a use-case for every need.

import java.util.Arrays;
import java.util.Formatter;

/**
* Keeps count across time slabs.
*/

public class TimeVersusCountKeeper {

private final long[] UNITS;
private final long UNIT_DURATION_MILLIS;
private final String UNIT_DURATION_NAME;

private volatile long till = System.currentTimeMillis();
private final String TILL_LOCK = new String("LOCK");

private String getDurationName(final long unitDurationMillis) {
final int unitCount = UNITS.length;
final String s;
String unitName;
if (unitCount <= 1) {
s = "";
unitName = "unit of duration " + UNIT_DURATION_MILLIS + "ms";
} else {
s = "s";
unitName = "unit(s) of duration " + UNIT_DURATION_MILLIS + "ms each";
}
if (UNIT_DURATION_MILLIS == 1000L) { unitName = "sec"; }
else if (UNIT_DURATION_MILLIS == 60 * 1000L) { unitName = "min"; }
else if (UNIT_DURATION_MILLIS == 60 * 60 * 1000L) { unitName = "hr"; }
else if (UNIT_DURATION_MILLIS == 24 * 60 * 60 * 1000L) { unitName = "day" + s; }
else if (UNIT_DURATION_MILLIS == 7 * 24 * 60 * 60 * 1000L) { unitName = "week" + s; }
return unitName;
}

private synchronized void update() {
long diff = 0;
synchronized (TILL_LOCK) {
diff = System.currentTimeMillis() - till;
}

if (diff > UNIT_DURATION_MILLIS) {
final long TRUNCATE_UNITS_COUNT = diff / UNIT_DURATION_MILLIS;
synchronized (UNITS) {
if (TRUNCATE_UNITS_COUNT >= UNITS.length) {
Arrays.fill(UNITS, 0L);
} else {
System.arraycopy(UNITS, 0, UNITS, (int) TRUNCATE_UNITS_COUNT,
UNITS.length - (int) TRUNCATE_UNITS_COUNT);
for (int i = 0; i < TRUNCATE_UNITS_COUNT; i++) {
UNITS[i] = 0;
}
}
}
synchronized (TILL_LOCK) {
till += TRUNCATE_UNITS_COUNT * UNIT_DURATION_MILLIS;
//till = System.currentTimeMillis();
}
}
}

public TimeVersusCountKeeper() {
UNITS = new long[10]; // last 10
UNIT_DURATION_MILLIS = 1000L; // seconds
UNIT_DURATION_NAME = getDurationName(UNIT_DURATION_MILLIS);
}

public TimeVersusCountKeeper(final int maxUnitsCount,
final long unitLengthMillis) {
this.UNITS = new long[maxUnitsCount];
this.UNIT_DURATION_MILLIS = unitLengthMillis;
UNIT_DURATION_NAME = getDurationName(UNIT_DURATION_MILLIS);
}

public void incrementBy(final long count) {
update();
synchronized (UNITS) {
UNITS[0] += count;
}
}

public void setCount(final long count) {
update();
synchronized (UNITS) {
UNITS[0] = count;
}
}

public long[] getElements() {
update();
synchronized (UNITS) {
return UNITS.clone();
}
}

@Override
public String toString() {
int unitsCount = 0;
synchronized (UNITS) {
unitsCount = UNITS.length;
}
return new StringBuilder()
.append(unitsCount)
.append(' ')
.append(UNIT_DURATION_NAME)
.append(": ")
.append(getElementsAsString(1) /*Arrays.toString(units)*/)
.toString();
}

public String getElementsAsString(final int minElementWidth) {
final StringBuilder sb = new StringBuilder();
final Formatter numFormatter = new Formatter(sb);
final String format = "%" + minElementWidth + "d";
final long[] array = getElements();
long totalSum = 0;
String delim = "";
sb.append('[');
for (int i = 0; array != null && i < array.length; i++) {
numFormatter.format(delim + format, array[i]);
totalSum += array[i];
delim = ", ";
}
sb.append("](Avg:");
numFormatter.format(format, (long) (totalSum / array.length));
sb.append(')');
return sb.toString();
}

public static void main(String[] args) {
final TimeVersusCountKeeper keeper = new TimeVersusCountKeeper();
for (int i = 0; i < 100; i++) {
try {
Thread.sleep(500);
} catch (InterruptedException e) {}
keeper.incrementBy(i);
for (int j = 0; j < 100; j++) {
keeper.incrementBy(1);
}
System.out.println(keeper.toString());
}
}

}

2 comments:

  1. good one but why dont u just use esper libraray...u can do much more than this

    ReplyDelete
  2. I didn't know about Esper -- thanks for sharing the info. I will look at Esper's details. Nevertheless, TimeVersusCountKeeper fits into a single class and is dead simple to use. :-)

    ReplyDelete

Disqus for Char Sequence