What is wrong with this stream-byte sequence generator?

I need a byte generator that will generate values ​​from Byte.MIN_VALUE to Byte.MAX_VALUE. When it reaches MAX_VALUE, it must begin anew with MIN_VALUE.

I wrote the code using AtomicInteger (see below); however, the code does not seem to work properly with simultaneous access, and if it artificially slows down using Thread.sleep () (if there is no sleep, it works fine, however, I suspect that it will display too quickly for concurrency problems).

Code (with some debugging code added):

public class ByteGenerator { private static final int INITIAL_VALUE = Byte.MIN_VALUE-1; private AtomicInteger counter = new AtomicInteger(INITIAL_VALUE); private AtomicInteger resetCounter = new AtomicInteger(0); private boolean isSlow = false; private long startTime; public byte nextValue() { int next = counter.incrementAndGet(); //if (isSlow) slowDown(5); if (next > Byte.MAX_VALUE) { synchronized(counter) { int i = counter.get(); //if value is still larger than max byte value, we reset it if (i > Byte.MAX_VALUE) { counter.set(INITIAL_VALUE); resetCounter.incrementAndGet(); if (isSlow) slowDownAndLog(10, "resetting"); } else { if (isSlow) slowDownAndLog(1, "missed"); } next = counter.incrementAndGet(); } } return (byte) next; } private void slowDown(long millis) { try { Thread.sleep(millis); } catch (InterruptedException e) { } } private void slowDownAndLog(long millis, String msg) { slowDown(millis); System.out.println(resetCounter + " " + (System.currentTimeMillis()-startTime) + " " + Thread.currentThread().getName() + ": " + msg); } public void setSlow(boolean isSlow) { this.isSlow = isSlow; } public void setStartTime(long startTime) { this.startTime = startTime; } } 

And, the test:

 public class ByteGeneratorTest { @Test public void testGenerate() throws Exception { ByteGenerator g = new ByteGenerator(); for (int n = 0; n < 10; n++) { for (int i = Byte.MIN_VALUE; i <= Byte.MAX_VALUE; i++) { assertEquals(i, g.nextValue()); } } } @Test public void testGenerateMultiThreaded() throws Exception { final ByteGenerator g = new ByteGenerator(); g.setSlow(true); final AtomicInteger[] counters = new AtomicInteger[Byte.MAX_VALUE-Byte.MIN_VALUE+1]; for (int i = 0; i < counters.length; i++) { counters[i] = new AtomicInteger(0); } Thread[] threads = new Thread[100]; final CountDownLatch latch = new CountDownLatch(threads.length); for (int i = 0; i < threads.length; i++) { threads[i] = new Thread(new Runnable() { public void run() { try { for (int i = Byte.MIN_VALUE; i <= Byte.MAX_VALUE; i++) { byte value = g.nextValue(); counters[value-Byte.MIN_VALUE].incrementAndGet(); } } finally { latch.countDown(); } } }, "generator-client-" + i); threads[i].setDaemon(true); } g.setStartTime(System.currentTimeMillis()); for (int i = 0; i < threads.length; i++) { threads[i].start(); } latch.await(); for (int i = 0; i < counters.length; i++) { System.out.println("value #" + (i+Byte.MIN_VALUE) + ": " + counters[i].get()); } //print out the number of hits for each value for (int i = 0; i < counters.length; i++) { assertEquals("value #" + (i+Byte.MIN_VALUE), threads.length, counters[i].get()); } } } 

The result of my dual core machine is that the value # -128 receives 146 hits (they should all get 100 hits the same, since we have 100 threads).

If anyone has any ideas what is wrong with this code, I’m all ears / eyes.

UPDATE: for those in a hurry and don't want to scroll down, the correct (and shortest and most elegant) way to solve this problem in Java:

 public byte nextValue() { return (byte) counter.incrementAndGet(); } 

Thank Heinz

+6
source share
5 answers

You make the incrementAndGet () decision based on the old counter.get () value. The counter value may again reach MAX_VALUE before you perform the incrementAndGet () operation on the counter.

 if (next > Byte.MAX_VALUE) { synchronized(counter) { int i = counter.get(); //here You make sure the the counter is not over the MAX_VALUE if (i > Byte.MAX_VALUE) { counter.set(INITIAL_VALUE); resetCounter.incrementAndGet(); if (isSlow) slowDownAndLog(10, "resetting"); } else { if (isSlow) slowDownAndLog(1, "missed"); //the counter can reach MAX_VALUE again if you wait here long enough } next = counter.incrementAndGet(); //here you increment on return the counter that can reach >MAX_VALUE in the meantime } } 

To make it work, you need to make sure that no decisions are made regarding outdated information. Either reset the counter, or return the old value.

 public byte nextValue() { int next = counter.incrementAndGet(); if (next > Byte.MAX_VALUE) { synchronized(counter) { next = counter.incrementAndGet(); //if value is still larger than max byte value, we reset it if (next > Byte.MAX_VALUE) { counter.set(INITIAL_VALUE + 1); next = INITIAL_VALUE + 1; resetCounter.incrementAndGet(); if (isSlow) slowDownAndLog(10, "resetting"); } else { if (isSlow) slowDownAndLog(1, "missed"); } } } return (byte) next; } 
+5
source

Initially, Java stores all fields as 4 or 8 byte values, even short and byte values. Field operations would simply perform bit masking to reduce bytes. Thus, we could easily do this:

 public byte nextValue() { return (byte) counter.incrementAndGet(); } 

Funny little puzzle, thanks to Nimes :-)

+8
source

Your synchronized block contains only the if body. It should wrap the whole method, including the if . Or just make your nextValue method synchronized. By the way, you don’t need Atomic variables at all.

I hope this works for you. Try using atomic variables only if you really need the highest performance code, i.e. The synchronized statement is bothering you. IMHO in most cases does not.

+3
source

If you understand correctly, you make sure that the nextValue results are in the range of Byte.MIN_VALUE and Byte.MAX_VALUE, and you do not care about the value stored in the counter. Then you can display the integers in bytes so that you show that the enumeration behavior is displayed:

 private static final int VALUE_RANGE = Byte.MAX_VALUE - Byte.MIN_VALUE + 1; private final AtomicInteger counter = new AtomicInteger(0); public byte nextValue() { return (byte) (counter.incrementAndGet() % VALUE_RANGE + Byte.MIN_VALUE - 1); } 

Beware, this is untested code. But the idea should work.

+2
source

I encoded the next version of nextValue using compareAndSet , which is intended for use in an unsynchronized block. He passed your unit tests:

Oh, and I introduced new constants for MIN_VALUE and MAX_VALUE, but you can ignore them if you want.

 static final int LOWEST_VALUE = Byte.MIN_VALUE; static final int HIGHEST_VALUE = Byte.MAX_VALUE; private AtomicInteger counter = new AtomicInteger(LOWEST_VALUE - 1); private AtomicInteger resetCounter = new AtomicInteger(0); public byte nextValue() { int oldValue; int newValue; do { oldValue = counter.get(); if (oldValue >= HIGHEST_VALUE) { newValue = LOWEST_VALUE; resetCounter.incrementAndGet(); if (isSlow) slowDownAndLog(10, "resetting"); } else { newValue = oldValue + 1; if (isSlow) slowDownAndLog(1, "missed"); } } while (!counter.compareAndSet(oldValue, newValue)); return (byte) newValue; } 

compareAndSet() works in conjunction with get() to control concurrency.

At the beginning of your critical section, you execute get() to retrieve the old value. Then you can perform some function, depending only on the old value, to calculate the new value. Then you use compareAndSet() to set the new value. If AtomicInteger is no longer equal to the old value when compareAndSet() executed (due to concurrent activity), it fails, and you should start all over again.

If you have an extreme amount of concurrency and the calculation time is long, it is possible that compareAndSet() may not work many times before this happens, and it may be useful to collect statistics on this subject if you are worried.

I do not suggest that this is a better or worse approach than a simple synchronized block, as others have suggested, but I personally would probably use a synchronized block for simplicity.

EDIT : I will answer your real question: "Why is my work not working?"

Your code has:

  int next = counter.incrementAndGet(); if (next > Byte.MAX_VALUE) { 

Since these two lines are not protected by a synchronized block, several threads can execute them at the same time and everyone gets the values next > Byte.MAX_VALUE . Then they will all go into the synchronized block and return counter back to INITIAL_VALUE (one after the other when they are waiting for each other).

Over the years, a huge number of errors have been written related to an attempt to get a performance tuning without synchronizing when it does not seem necessary. For example, see Double Checked Locking

+1
source

Source: https://habr.com/ru/post/897128/


All Articles