Findbugs:

- Fix several 'increment of volatile is not atomic' all over
    Remaining: UDP PeerState.java, to be checked in separately
  - Comment out all of unused MessageStateMonitor
This commit is contained in:
zzz
2013-11-12 19:28:23 +00:00
parent df84a2fcd0
commit b53ed94e8f
11 changed files with 91 additions and 65 deletions

View File

@@ -6,6 +6,7 @@ import java.util.Map;
import java.util.Set;
import java.util.StringTokenizer;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import net.i2p.I2PAppContext;
import net.i2p.I2PException;
@@ -40,7 +41,7 @@ class ConnectionManager {
private final Map<Long, PingRequest> _pendingPings;
private volatile boolean _throttlersInitialized;
private final ConnectionOptions _defaultOptions;
private volatile int _numWaiting;
private final AtomicInteger _numWaiting = new AtomicInteger();
private long _soTimeout;
private volatile ConnThrottler _minuteThrottler;
private volatile ConnThrottler _hourThrottler;
@@ -299,24 +300,24 @@ class ConnectionManager {
long expiration = _context.clock().now() + opts.getConnectTimeout();
if (opts.getConnectTimeout() <= 0)
expiration = _context.clock().now() + DEFAULT_STREAM_DELAY_MAX;
_numWaiting++;
_numWaiting.incrementAndGet();
while (true) {
long remaining = expiration - _context.clock().now();
if (remaining <= 0) {
_log.logAlways(Log.WARN, "Refusing to connect since we have exceeded our max of "
+ _defaultOptions.getMaxConns() + " connections");
_numWaiting--;
_numWaiting.decrementAndGet();
return null;
}
if (locked_tooManyStreams()) {
int max = _defaultOptions.getMaxConns();
// allow a full buffer of pending/waiting streams
if (_numWaiting > max) {
if (_numWaiting.get() > max) {
_log.logAlways(Log.WARN, "Refusing connection since we have exceeded our max of "
+ max + " and there are " + _numWaiting
+ " waiting already");
_numWaiting--;
_numWaiting.decrementAndGet();
return null;
}
@@ -346,8 +347,14 @@ class ConnectionManager {
if (opts.getConnectDelay() <= 0) {
con.waitForConnect();
}
if (_numWaiting > 0)
_numWaiting--;
// safe decrement
for (;;) {
int n = _numWaiting.get();
if (n <= 0)
break;
if (_numWaiting.compareAndSet(n, n - 1))
break;
}
_context.statManager().addRateData("stream.connectionCreated", 1, 0);
return con;

View File

@@ -36,9 +36,9 @@ class MessageInputStream extends InputStream {
private final List<ByteArray> _readyDataBlocks;
private int _readyDataBlockIndex;
/** highest message ID used in the readyDataBlocks */
private volatile long _highestReadyBlockId;
private long _highestReadyBlockId;
/** highest overall message ID */
private volatile long _highestBlockId;
private long _highestBlockId;
/**
* Message ID (Long) to ByteArray for blocks received
* out of order when there are lower IDs not yet
@@ -76,17 +76,19 @@ class MessageInputStream extends InputStream {
* @return highest data block ID completely received or -1 for none
*/
public long getHighestReadyBockId() {
// not synchronized as it doesnt hurt to read a too-low value
synchronized (_dataLock) {
return _highestReadyBlockId;
}
}
/**
* @return highest data block ID received or -1 for none
*/
public long getHighestBlockId() {
// not synchronized as it doesnt hurt to read a too-low value
synchronized (_dataLock) {
return _highestBlockId;
}
}
/**
* Retrieve the message IDs that are holes in our sequence - ones

View File

@@ -24,7 +24,7 @@ class PacketLocal extends Packet implements MessageOutputStream.WriteStatus {
private final Destination _to;
private SessionKey _keyUsed;
private final long _createdOn;
private volatile int _numSends;
private final AtomicInteger _numSends = new AtomicInteger();
private volatile long _lastSend;
private long _acceptedOn;
/** LOCKING: this */
@@ -99,9 +99,10 @@ class PacketLocal extends Packet implements MessageOutputStream.WriteStatus {
public void prepare() {
if (_connection != null)
_connection.getInputStream().updateAcks(this);
if (_numSends > 0) {
int numSends = _numSends.get();
if (numSends > 0) {
// so we can debug to differentiate resends
setOptionalDelay(_numSends * 1000);
setOptionalDelay(numSends * 1000);
setFlag(FLAG_DELAY_REQUESTED);
}
}
@@ -109,7 +110,7 @@ class PacketLocal extends Packet implements MessageOutputStream.WriteStatus {
public long getCreatedOn() { return _createdOn; }
public long getLifetime() { return _context.clock().now() - _createdOn; }
public void incrementSends() {
_numSends++;
_numSends.incrementAndGet();
_lastSend = _context.clock().now();
}
@@ -152,7 +153,7 @@ class PacketLocal extends Packet implements MessageOutputStream.WriteStatus {
else
return (int)(_ackOn - _createdOn);
}
public int getNumSends() { return _numSends; }
public int getNumSends() { return _numSends.get(); }
public long getLastSend() { return _lastSend; }
/** @return null if not bound */
@@ -166,7 +167,7 @@ class PacketLocal extends Packet implements MessageOutputStream.WriteStatus {
final int cnt = _nackCount.incrementAndGet();
SimpleTimer2.TimedEvent evt = _resendEvent;
if (cnt >= Connection.FAST_RETRANSMIT_THRESHOLD && evt != null && (!_retransmitted) &&
(_numSends == 1 || _lastSend < _context.clock().now() - 4*1000)) { // Don't fast retx if we recently resent it
(_numSends.get() == 1 || _lastSend < _context.clock().now() - 4*1000)) { // Don't fast retx if we recently resent it
_retransmitted = true;
evt.reschedule(0);
// the predicate used to be '+', changing to '-' --zab
@@ -174,13 +175,13 @@ class PacketLocal extends Packet implements MessageOutputStream.WriteStatus {
if (_log.shouldLog(Log.DEBUG)) {
final String log = String.format("%s nacks and retransmits. Criteria: nacks=%d, retransmitted=%b,"+
" numSends=%d, lastSend=%d, now=%d",
toString(), cnt, _retransmitted, _numSends, _lastSend, _context.clock().now());
toString(), cnt, _retransmitted, _numSends.get(), _lastSend, _context.clock().now());
_log.debug(log);
}
} else if (_log.shouldLog(Log.DEBUG)) {
final String log = String.format("%s nack but no retransmit. Criteria: nacks=%d, retransmitted=%b,"+
" numSends=%d, lastSend=%d, now=%d",
toString(), cnt, _retransmitted, _numSends, _lastSend, _context.clock().now());
toString(), cnt, _retransmitted, _numSends.get(), _lastSend, _context.clock().now());
_log.debug(log);
}
}
@@ -203,8 +204,9 @@ class PacketLocal extends Packet implements MessageOutputStream.WriteStatus {
buf.append(" ack after ").append(getAckTime());
}
if (_numSends > 1)
buf.append(" sent ").append(_numSends).append(" times");
int numSends = _numSends.get();
if (numSends > 1)
buf.append(" sent ").append(numSends).append(" times");
if (isFlagSet(FLAG_SYNCHRONIZE |
FLAG_CLOSE |

View File

@@ -1,5 +1,7 @@
package net.i2p.router;
import java.util.concurrent.atomic.AtomicLong;
import net.i2p.data.DataHelper;
/**
@@ -9,7 +11,7 @@ import net.i2p.data.DataHelper;
*/
public class JobStats {
private final String _job;
private volatile long _numRuns;
private final AtomicLong _numRuns = new AtomicLong();
private volatile long _totalTime;
private volatile long _maxTime;
private volatile long _minTime;
@@ -26,7 +28,7 @@ public class JobStats {
}
public void jobRan(long runTime, long lag) {
_numRuns++;
_numRuns.incrementAndGet();
_totalTime += runTime;
if ( (_maxTime < 0) || (runTime > _maxTime) )
_maxTime = runTime;
@@ -40,13 +42,14 @@ public class JobStats {
}
public String getName() { return _job; }
public long getRuns() { return _numRuns; }
public long getRuns() { return _numRuns.get(); }
public long getTotalTime() { return _totalTime; }
public long getMaxTime() { return _maxTime; }
public long getMinTime() { return _minTime; }
public long getAvgTime() {
if (_numRuns > 0)
return _totalTime / _numRuns;
long numRuns = _numRuns.get();
if (numRuns > 0)
return _totalTime / numRuns;
else
return 0;
}
@@ -54,8 +57,9 @@ public class JobStats {
public long getMaxPendingTime() { return _maxPendingTime; }
public long getMinPendingTime() { return _minPendingTime; }
public long getAvgPendingTime() {
if (_numRuns > 0)
return _totalPendingTime / _numRuns;
long numRuns = _numRuns.get();
if (numRuns > 0)
return _totalPendingTime / numRuns;
else
return 0;
}

View File

@@ -8,6 +8,7 @@ import net.i2p.util.Log;
* @deprecated unused
*/
public class MessageStateMonitor {
/****
private Log _log;
private volatile int _inboundLiveCount;
private volatile int _inboundReadCount;
@@ -65,4 +66,5 @@ public class MessageStateMonitor {
public int getInboundReadCount() { return _inboundReadCount; }
public int getOutboundLiveCount() { return _outboundLiveCount; }
public int getOutboundDiscardedCount() { return _outboundDiscardedCount; }
****/
}

View File

@@ -1,5 +1,7 @@
package net.i2p.router;
import java.util.concurrent.atomic.AtomicInteger;
import net.i2p.data.Hash;
/**
@@ -15,7 +17,7 @@ class RouterDoSThrottle extends RouterThrottleImpl {
}
private volatile long _currentLookupPeriod;
private volatile int _currentLookupCount;
private final AtomicInteger _currentLookupCount = new AtomicInteger();
// if we receive over 20 netDb lookups in 10 seconds, someone is acting up
private static final long LOOKUP_THROTTLE_PERIOD = 10*1000;
private static final long LOOKUP_THROTTLE_MAX = 20;
@@ -30,10 +32,10 @@ class RouterDoSThrottle extends RouterThrottleImpl {
long now = _context.clock().now();
if (_currentLookupPeriod + LOOKUP_THROTTLE_PERIOD > now) {
// same period, check for DoS
_currentLookupCount++;
if (_currentLookupCount >= LOOKUP_THROTTLE_MAX) {
_context.statManager().addRateData("router.throttleNetDbDoS", _currentLookupCount, 0);
int rand = _context.random().nextInt(_currentLookupCount);
int cnt = _currentLookupCount.incrementAndGet();
if (cnt >= LOOKUP_THROTTLE_MAX) {
_context.statManager().addRateData("router.throttleNetDbDoS", cnt, 0);
int rand = _context.random().nextInt(cnt);
if (rand > LOOKUP_THROTTLE_MAX) {
return false;
} else {
@@ -47,7 +49,7 @@ class RouterDoSThrottle extends RouterThrottleImpl {
// on to the next period, reset counter, no DoS
// (no, I'm not worried about concurrency here)
_currentLookupPeriod = now;
_currentLookupCount = 1;
_currentLookupCount.set(1);
return true;
}
}

View File

@@ -143,7 +143,7 @@ class FloodOnlySearchJob extends FloodSearchJob {
********/
int count = 0; // keep a separate count since _lookupsRemaining could be decremented elsewhere
for (int i = 0; _lookupsRemaining < CONCURRENT_SEARCHES && i < floodfillPeers.size(); i++) {
for (int i = 0; _lookupsRemaining.get() < CONCURRENT_SEARCHES && i < floodfillPeers.size(); i++) {
Hash peer = floodfillPeers.get(i);
if (peer.equals(getContext().routerHash()))
continue;
@@ -177,7 +177,7 @@ class FloodOnlySearchJob extends FloodSearchJob {
_log.info(getJobId() + ": Floodfill search for " + _key + " to " + peer);
getContext().tunnelDispatcher().dispatchOutbound(dlm, outTunnel.getSendTunnelId(0), peer);
count++;
_lookupsRemaining++;
_lookupsRemaining.incrementAndGet();
}
if (count <= 0) {

View File

@@ -2,6 +2,7 @@ package net.i2p.router.networkdb.kademlia;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;
import net.i2p.data.Hash;
import net.i2p.router.Job;
@@ -29,7 +30,7 @@ public class FloodSearchJob extends JobImpl {
protected long _expiration;
protected int _timeoutMs;
protected final boolean _isLease;
protected volatile int _lookupsRemaining;
protected final AtomicInteger _lookupsRemaining = new AtomicInteger();
protected volatile boolean _dead;
protected final long _created;
@@ -148,16 +149,20 @@ public class FloodSearchJob extends JobImpl {
protected Hash getKey() { return _key; }
/**
* TODO AtomicInteger?
* @return number remaining after decrementing
*/
protected int decrementRemaining() {
if (_lookupsRemaining > 0)
return (--_lookupsRemaining);
// safe decrement
for (;;) {
int n = _lookupsRemaining.get();
if (n <= 0)
return 0;
if (_lookupsRemaining.compareAndSet(n, n - 1))
return n - 1;
}
}
protected int getLookupsRemaining() { return _lookupsRemaining; }
protected int getLookupsRemaining() { return _lookupsRemaining.get(); }
/**
* Deprecated, unused, see FOSJ override

View File

@@ -12,6 +12,7 @@ import java.math.BigInteger;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import net.i2p.I2PAppContext;
import net.i2p.data.DataHelper;
@@ -29,7 +30,7 @@ class KBucketSet {
private final I2PAppContext _context;
private final LocalHash _us;
private final KBucket _buckets[];
private volatile int _size;
private final AtomicInteger _size = new AtomicInteger();
public final static int BASE = 8; // must go into KEYSIZE_BITS evenly
public final static int KEYSIZE_BITS = Hash.HASH_LENGTH * 8;
@@ -55,7 +56,7 @@ class KBucketSet {
int oldSize = _buckets[bucket].getKeyCount();
int numInBucket = _buckets[bucket].add(peer);
if (numInBucket != oldSize)
_size++;
_size.incrementAndGet();
if (numInBucket > BUCKET_SIZE) {
// perhaps queue up coalesce job? naaahh.. lets let 'er grow for now
}
@@ -72,7 +73,7 @@ class KBucketSet {
*
*/
public int size() {
return _size;
return _size.get();
/*
int size = 0;
for (int i = 0; i < _buckets.length; i++)
@@ -86,7 +87,7 @@ class KBucketSet {
KBucket kbucket = getBucket(bucket);
boolean removed = kbucket.remove(entry);
if (removed)
_size--;
_size.decrementAndGet();
return removed;
}
@@ -95,7 +96,7 @@ class KBucketSet {
for (int i = 0; i < _buckets.length; i++) {
_buckets[i].setEntries(Collections.EMPTY_SET);
}
_size = 0;
_size.set(0);
_us.clearXorCache();
}

View File

@@ -50,7 +50,7 @@ class SingleSearchJob extends FloodOnlySearchJob {
if (_log.shouldLog(Log.INFO))
_log.info(getJobId() + ": Single search for " + _key + " to " + _to);
getContext().tunnelDispatcher().dispatchOutbound(dlm, outTunnel.getSendTunnelId(0), _to);
_lookupsRemaining = 1;
_lookupsRemaining.set(1);
}
@Override

View File

@@ -4,6 +4,7 @@ import java.io.IOException;
import java.io.OutputStream;
import java.util.Date;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicLong;
import net.i2p.router.RouterContext;
import net.i2p.stat.RateStat;
@@ -16,14 +17,14 @@ import net.i2p.util.Log;
public class TunnelHistory {
private final RouterContext _context;
private final Log _log;
private volatile long _lifetimeAgreedTo;
private volatile long _lifetimeRejected;
private final AtomicLong _lifetimeAgreedTo = new AtomicLong();
private final AtomicLong _lifetimeRejected = new AtomicLong();
private volatile long _lastAgreedTo;
private volatile long _lastRejectedCritical;
private volatile long _lastRejectedBandwidth;
private volatile long _lastRejectedTransient;
private volatile long _lastRejectedProbabalistic;
private volatile long _lifetimeFailed;
private final AtomicLong _lifetimeFailed = new AtomicLong();
private volatile long _lastFailed;
private RateStat _rejectRate;
private RateStat _failRate;
@@ -53,11 +54,11 @@ public class TunnelHistory {
}
/** total tunnels the peer has agreed to participate in */
public long getLifetimeAgreedTo() { return _lifetimeAgreedTo; }
public long getLifetimeAgreedTo() { return _lifetimeAgreedTo.get(); }
/** total tunnels the peer has refused to participate in */
public long getLifetimeRejected() { return _lifetimeRejected; }
public long getLifetimeRejected() { return _lifetimeRejected.get(); }
/** total tunnels the peer has agreed to participate in that were later marked as failed prematurely */
public long getLifetimeFailed() { return _lifetimeFailed; }
public long getLifetimeFailed() { return _lifetimeFailed.get(); }
/** when the peer last agreed to participate in a tunnel */
public long getLastAgreedTo() { return _lastAgreedTo; }
/** when the peer last refused to participate in a tunnel with level of critical */
@@ -76,7 +77,7 @@ public class TunnelHistory {
}
public void incrementAgreedTo() {
_lifetimeAgreedTo++;
_lifetimeAgreedTo.incrementAndGet();
_lastAgreedTo = _context.clock().now();
}
@@ -85,7 +86,7 @@ public class TunnelHistory {
* tunnel (large == more severe)
*/
public void incrementRejected(int severity) {
_lifetimeRejected++;
_lifetimeRejected.incrementAndGet();
if (severity >= TUNNEL_REJECT_CRIT) {
_lastRejectedCritical = _context.clock().now();
_rejectRate.addData(1, 1);
@@ -106,7 +107,7 @@ public class TunnelHistory {
* @param pct = probability * 100
*/
public void incrementFailed(int pct) {
_lifetimeFailed++;
_lifetimeFailed.incrementAndGet();
_failRate.addData(pct, 1);
_lastFailed = _context.clock().now();
}
@@ -147,9 +148,9 @@ public class TunnelHistory {
addDate(buf, "lastRejectedBandwidth", _lastRejectedBandwidth, "When was the last time the peer refused to participate in a tunnel (Bandwidth response code)?");
addDate(buf, "lastRejectedTransient", _lastRejectedTransient, "When was the last time the peer refused to participate in a tunnel (Transient load response code)?");
addDate(buf, "lastRejectedProbabalistic", _lastRejectedProbabalistic, "When was the last time the peer refused to participate in a tunnel (Probabalistic response code)?");
add(buf, "lifetimeAgreedTo", _lifetimeAgreedTo, "How many tunnels has the peer ever agreed to participate in?");
add(buf, "lifetimeFailed", _lifetimeFailed, "How many tunnels has the peer ever agreed to participate in that failed prematurely?");
add(buf, "lifetimeRejected", _lifetimeRejected, "How many tunnels has the peer ever refused to participate in?");
add(buf, "lifetimeAgreedTo", _lifetimeAgreedTo.get(), "How many tunnels has the peer ever agreed to participate in?");
add(buf, "lifetimeFailed", _lifetimeFailed.get(), "How many tunnels has the peer ever agreed to participate in that failed prematurely?");
add(buf, "lifetimeRejected", _lifetimeRejected.get(), "How many tunnels has the peer ever refused to participate in?");
out.write(buf.toString().getBytes());
_rejectRate.store(out, "tunnelHistory.rejectRate");
_failRate.store(out, "tunnelHistory.failRate");
@@ -172,9 +173,9 @@ public class TunnelHistory {
_lastRejectedBandwidth = getLong(props, "tunnels.lastRejectedBandwidth");
_lastRejectedTransient = getLong(props, "tunnels.lastRejectedTransient");
_lastRejectedProbabalistic = getLong(props, "tunnels.lastRejectedProbabalistic");
_lifetimeAgreedTo = getLong(props, "tunnels.lifetimeAgreedTo");
_lifetimeFailed = getLong(props, "tunnels.lifetimeFailed");
_lifetimeRejected = getLong(props, "tunnels.lifetimeRejected");
_lifetimeAgreedTo.set(getLong(props, "tunnels.lifetimeAgreedTo"));
_lifetimeFailed.set(getLong(props, "tunnels.lifetimeFailed"));
_lifetimeRejected.set(getLong(props, "tunnels.lifetimeRejected"));
try {
_rejectRate.load(props, "tunnelHistory.rejectRate", true);
if (_log.shouldLog(Log.DEBUG))