Apache Geode provides a mechanism to asynchronously distribute batches of events between two disparate DistributedSystems called a WAN topology. The events are stored in queues in the local DistributedSystem before being batched and distributed. The default behavior can be changed with the gemfire.GatewaySender.REMOVE_FROM_QUEUE_ON_EXCEPTION java system property. Setting this property to false will cause all batches of events to be retried indefinitely until they succeed. The default behavior can cause a Region’s data in each site to become inconsistent. This article describes a way to verify that a Region’s data in two WAN sites is consistent.
Apache Geode provides a mechanism to asynchronously distribute batches of events between two disparate DistributedSystems called a WAN topology. The events are stored in queues in the local DistributedSystem before being batched and distributed.
If a connection cannot be made to the remote WAN site, the events remain in the GatewaySender’s queue until such time as a connection can be made. At that time, events are batched and sent to the remote site. Once an acknowledgement has been received by the GatewaySender for those events, they are removed from the queue. Whether the events are successfully applied in the remote site is not taken into account. Any exceptions that occur in the remote site are logged in both sites, but once the acknowledgement is received, the events are removed from the queue. This decision was made mainly to prevent issues in the remote site (e.g. memory pressure, partitions offline, etc.) from cascading back to and affecting the local site.
The default behavior can be changed with the gemfire.GatewaySender.REMOVE_FROM_QUEUE_ON_EXCEPTION java system property. Setting this property to false will cause all batches of events to be retried indefinitely until they succeed.
The default behavior can cause a Region’s data in each site to become inconsistent. This article describes a way to verify that a Region’s data in two WAN sites is consistent.
All source code described in this article is available here.
The WanVerificationService is instantiated with a Region name, a Pool connected to site 1 and a Pool connected site 2. It:
The keySet comparison is done using equals.
The value comparison is done using a ValueComparator which defines one method called compare. The default implementation of compare uses equals, so if the value class implements equals, the default implementation will work fine. But, if the value class does not implement equals, then a non-default ValueComparator will be required to compare the two values.
The resulting comparison is logged. This behavior could be enhanced with a callback that allows the user to take some action to repair the sites.
The getKeySet method:
The getKeySet method is invoked for each site.
private Set getKeySet(String regionName, String siteName) {
Region region = createRegion(regionName, siteName);
Set keySet = region.keySetOnServer();
closeRegion(region);
return keySet;
}
The createRegion method creates a proxy Region connected to a specific site.
private Region createRegion(String regionName, String siteName) {
return ((ClientCache) this.cache)
.createClientRegionFactory(ClientRegionShortcut.PROXY)
.setPoolName(siteName)
.create(regionName);
}
Unfortunately, a Region can only be connected to one Pool (and thus one site), and once the Region is connected to that Pool, it can’t be changed. So, to repeatedly connect to one site then the other, the Region must constantly be closed and recreated. Luckily this is an inexpensive operation.
The closeRegion method is used to close a proxy Region. It also closes the ClientMetadataService which clears its known server-to-bucket layout.
private void closeRegion(Region region) {
region.close();
((InternalCache) this.cache).getClientMetadataService().close();
}
The compareKeySets method:
private boolean compareKeySets(Set site1Keys, Set site2Keys) {
this.builder
.append("\n\n==============")
.append("\nComparing keys")
.append("\n==============");
boolean allKeysAreEqual = site1Keys.equals(site2Keys);
if (allKeysAreEqual) {
this.builder.append("\nAll ").append(site1Keys.size()).append(" keys are equal");
} else {
this.builder.append("\nAll keys are not equal. Site 1 contains ").append(site1Keys.size()).append(" keys. Site 2 contains ").append(site2Keys.size())
.append(" keys.");
Set site1Differences = new HashSet(site1Keys);
site1Differences.removeIf(site2Keys::contains);
this.builder.append("\nSite 1 contains these ").append(site1Differences.size()).append(" keys not found in site 2: ").append(site1Differences);
Set site2Differences = new HashSet(site2Keys);
site2Differences.removeIf(site1Keys::contains);
this.builder.append("\nSite 2 contains these ").append(site2Differences.size()).append(" keys not found in site 1: ").append(site2Differences);
}
return allKeysAreEqual;
}
The compareAllValues method iterates the keySet and, for each key, invokes compareSingleValues to compare the value in each site.
private void compareAllValues(String regionName, ValueComparer valueComparer, Set keys, int fromSite, int toSite) {
this.builder
.append("\n\n=============================================")
.append("\nComparing values in site ")
.append(fromSite)
.append(" to those in site ")
.append(toSite)
.append("\n=============================================");
this.allValuesAreEqual = true;
keys.forEach(key -> compareSingleValues(regionName, valueComparer, key));
if (this.allValuesAreEqual) {
this.builder
.append("\nAll values in site ")
.append(fromSite)
.append(" are equal to those in site ")
.append(toSite);
}
}
To compare the value in each site, the compareSingleValues method:
private void compareSingleValues(String regionName, ValueComparer valueComparer, Object key) {
// Get the value in site 1
Region site1Region = createRegion(regionName, this.site1Pool.getName());
Object site1Value = site1Region.get(key);
closeRegion(site1Region);
// Get the value in site 2
Region site2Region = createRegion(regionName, this.site2Pool.getName());
Object site2Value = site2Region.get(key);
closeRegion(site2Region);
// Compare the values
boolean valuesAreEqual;
if (site1Value == null && site2Value == null) {
valuesAreEqual = true;
} else if (site1Value == null) {
valuesAreEqual = valueComparer.compare(site2Value, site1Value);
} else {
valuesAreEqual = valueComparer.compare(site1Value, site2Value);
}
if (!valuesAreEqual) {
this.builder
.append("\nValues are not equal for key=")
.append(key)
.append("; site1Value=")
.append(site1Value)
.append("; site2Value=")
.append(site2Value);
this.allValuesAreEqual = false;
}
}
Some examples of the results logged are shown below.
If the keys in both sites are equal, but the values are not, a message like below is logged:
Verifying entries for region=Trade
==============
Comparing keys
==============
All 20 keys are equal
============================================
Comparing values in site 1 to those in site 2
=============================================
Values are not equal for key=0; site1Value=Trade(id=0, cusip=MRK, shares=12, price=202.65); site2Value=Trade(id=0, cusip=PFE, shares=11, price=682.45)
Values are not equal for key=1; site1Value=Trade(id=1, cusip=UNH, shares=51, price=995.72); site2Value=Trade(id=1, cusip=BAC, shares=69, price=882.05)
Values are not equal for key=2; site1Value=Trade(id=2, cusip=CMCSA, shares=83, price=684.68); site2Value=Trade(id=2, cusip=TM, shares=98, price=143.51)
Values are not equal for key=3; site1Value=Trade(id=3, cusip=V, shares=42, price=346.58); site2Value=Trade(id=3, cusip=GOOGL, shares=93, price=467.43)
Values are not equal for key=4; site1Value=Trade(id=4, cusip=AXP, shares=53, price=244.85); site2Value=Trade(id=4, cusip=PG, shares=66, price=270.92)
If the keys in both sites are not equal, a message like below is logged:
Verifying entries for region=Trade
==============
Comparing keys
==============
All keys are not equal. Site 1 contains 10 keys. Site 2 contains 10 keys.
Site 1 contains these 10 keys not found in site 2: [0, 12, 2, 14, 4, 16, 6, 18, 8, 10]
Site 2 contains these 10 keys not found in site 1: [11, 1, 13, 3, 15, 5, 17, 7, 19, 9]
=============================================
Comparing values in site 1 to those in site 2
=============================================
Values are not equal for key=0; site1Value=Trade(id=0, cusip=CRM, shares=44, price=921.92); site2Value=null
Values are not equal for key=12; site1Value=Trade(id=12, cusip=UNH, shares=56, price=846.28); site2Value=null
Values are not equal for key=2; site1Value=Trade(id=2, cusip=BAC, shares=78, price=939.22); site2Value=null
Values are not equal for key=14; site1Value=Trade(id=14, cusip=TM, shares=34, price=708.74); site2Value=null
Values are not equal for key=4; site1Value=Trade(id=4, cusip=V, shares=96, price=242.98); site2Value=null
...
=============================================
Comparing values in site 2 to those in site 1
=============================================
Values are not equal for key=11; site1Value=null; site2Value=Trade(id=11, cusip=PFE, shares=57, price=85.83)
Values are not equal for key=1; site1Value=null; site2Value=Trade(id=1, cusip=CRM, shares=81, price=796.38)
Values are not equal for key=13; site1Value=null; site2Value=Trade(id=13, cusip=INTC, shares=75, price=412.98)
Values are not equal for key=3; site1Value=null; site2Value=Trade(id=3, cusip=FB, shares=33, price=64.32)
Values are not equal for key=15; site1Value=null; site2Value=Trade(id=15, cusip=CVS, shares=18, price=893.04)
...
The WanVerificationService should be run when the sites are not active and the queues are empty. Any events in the queues or in-flight between sites are ignored, so if the sites are active or the queues contain events, the results might incorrectly show differences which may not exist.
The WanVerificationService calls the Region get method on each key in each site. If the Region defines a CacheLoader, it will be invoked if the value is null for that key.
A service like this built into Apache Geode would be useful.
The service should also provide a callback for unequal values so that the application can take some action to repair the sites (e.g. applying the value in one site to the other).
There are a number of Apache Geode enhancements that would be helpful in this scenario including the ability to: