This article describes a way to send the delta bytes between the DistributedSystems instead of sending the entire object bytes.
Apache Geode provides a Delta interface that facilitates serializing the changes to an object between two JVMs rather than the entire object when changes are made to that object. For large objects, this provides an optimization that is supported from:
Sending Deltas from servers in one DistributedSystem to servers in another (e.g. two WAN sites) is not supported. Currently, each event sent between the DistributedSystems contains the entire object. Normally, objects are stored in Regions as byte arrays. For Deltas, that is not the case. Instead, Deltas are represented as fully-deserialized objects. When a change to a Delta is received, it is applied to the in-memory object. Combine this with the fact that sending Deltas between DistributedSystems is not supported, and that means the entire object is serialized each time it is updated in the sending DistributedSystem and deserialized in the receiving one. Since Deltas are mainly used for objects that can grow very large (like sessions), this can be inefficient.
This article describes a way to send the delta bytes between the DistributedSystems instead of sending the entire object bytes.
For this implementation, each event travels the path below between a client in the sending DistributedSystem and a server in the receiving DistributedSystem:
Note: The GatewaySender
and GatewayReceiver
in these steps actually encompass several different objects.
This diagram shows the architecture of this implementation with one server in each DistributedSystem for simplicity:
The Region configuration of the above architecture looks like this in XML:
<region name="Trade" refid="PARTITION_REDUNDANT">
<region-attributes>
<cache-writer>
<class-name>example.server.callback.GatewaySenderDeltaCacheWriter</class-name>
</cache-writer>
<cache-listener>
<class-name>example.server.callback.GatewaySenderDeltaCacheListener</class-name>
</cache-listener>
</region-attributes>
</region>
<region name="Trade_gateway_sender_delta_proxy" refid="PARTITION_REDUNDANT">
<region-attributes gateway-sender-ids="ny">
<partition-attributes colocated-with="/Trade" redundant-copies="1"/>
<cache-writer>
<class-name>example.server.callback.GatewaySenderProxyCacheWriter</class-name>
</cache-writer>
</region-attributes>
</region>
There are a few caveats to this implementation:
All source code described in this article as well as an example usage is available here.
The implementation consists of the following three CacheCallback classes:
The GatewaySenderDeltaCacheWriter process method:
The tail key is the key in the GatewaySender queue. In normal GatewaySender-enabled
regions, the tail key is initialized by the primary BucketRegion’s handleWANEvent method. It is then replicated to redundant servers. Since the data Region in this case is not GatewaySender-enabled
, this doesn’t happen. Once the tail key is initialized in the event, it is set into the callback argument. This is done because the tail key is only replicated between servers in GatewaySender-enabled
Regions. It is ignored in non-GatewaySender-enabled
Regions.
private void process(EntryEvent event) {
EntryEventImpl eei = (EntryEventImpl) event;
if (!isFromRemoteWANSite(eei)) {
// Update the tailKey (which is the key in the queue)
// The tailKey is set by handleWANEvent in the event in the primary.
// It won't be called in this case since the data region is not WAN-enabled.
setTailKey(eei);
// Set the callback argument since the tail key is not serialized between members
// if the region is not wan-enabled.
eei.setCallbackArgument(eei.getTailKey());
}
}
The GatewaySenderDeltaCacheWriter setTailKey
method invokes the BucketRegion’s handleWANEvent
method to set the tail key.
private void setTailKey(EntryEventImpl event) {
PartitionedRegion pr = (PartitionedRegion) getProxyRegion(event.getRegion());
BucketRegion br = pr.getBucketRegion(event.getKey());
br.handleWANEvent(event);
}
The GatewaySenderDeltaCacheListener process method:
EntryEvent
using the proxy Region and input EntryEvent
EntryEvent
to each GatewaySenderprivate void process(EntryEvent event) {
EntryEventImpl eei = (EntryEventImpl) event;
if (!isFromRemoteWANSite(eei)) {
// Get the GatewaySender proxy region
PartitionedRegion proxyRegion = (PartitionedRegion) getProxyRegion(event.getRegion());
// Create the appropriate event
EntryEventImpl proxyEvent = createProxyEntryEvent(eei, proxyRegion);
// Add the event to any GatewaySender queues
deliverToGatewaySenderQueues(proxyEvent);
}
}
The GatewaySenderDeltaCacheListener createProxyEntryEvent
method creates the EntryEvent
on the proxy Region.
The EntryEvent
contains:
private EntryEventImpl createProxyEntryEvent(EntryEventImpl event, PartitionedRegion proxyRegion) {
byte[] newValue = null;
boolean isDelta;
Operation operation;
if (event.getDeltaBytes() != null) {
newValue = event.getDeltaBytes();
operation = Operation.UPDATE;
isDelta = true;
} else if (event.getCachedSerializedNewValue() != null) {
newValue = event.getCachedSerializedNewValue();
operation = Operation.CREATE;
isDelta = false;
} else {
operation = Operation.DESTROY;
isDelta = false;
}
EntryEventImpl proxyEvent = EntryEventImpl.create(proxyRegion, operation, event.getKey(), newValue, isDelta /*callbackArg*/, event.isOriginRemote(), event.getDistributedMember(), false /* generateCallbacks */, event.getEventId());
proxyEvent.setContext(event.getContext());
proxyEvent.setVersionTag(event.getVersionTag());
proxyEvent.setTailKey((Long) event.getCallbackArgument());
return proxyEvent;
}
The GatewaySenderDeltaCacheListener deliverToGatewaySenderQueues
method retrieves the proxy Region’s GatewaySenders and distributes the event to each one.
private void deliverToGatewaySenderQueues(EntryEventImpl wanEvent) {
Region region = wanEvent.getRegion();
Cache cache = region.getCache();
Set<String> senderIds = region.getAttributes().getGatewaySenderIds();
for (String senderId : senderIds) {
// Get the GatewaySender
AbstractGatewaySender sender = (AbstractGatewaySender) cache.getGatewaySender(senderId);
// Distribute the EntryEvent to the GatewaySender
sender.distribute(getEnumListenerEvent(wanEvent.getOperation()), wanEvent, getRemoteDsIds(cache, senderIds));
}
}
The GatewaySenderProxyCacheWriter process method:
LocalRegion
basicBridgePut or basicBridgeDestroy method depending on the EntryEvent
’s operation and boolean callback argument. The basicBridgePut method is invoked with either the full bytes or delta bytes from the input EntryEvent
.private void process(EntryEvent event) {
EntryEventImpl eei = (EntryEventImpl) event;
if (isFromRemoteWANSite(eei)) {
byte[] newValue = (byte[]) eei.getNewValue();
Operation operation = event.getOperation();
boolean callbackArg = (Boolean) event.getCallbackArgument();
if (event.getOperation().isDestroy()) {
getDataRegion(event.getRegion()).basicBridgeDestroy(event.getKey(), eei.getRawCallbackArgument(), eei.getContext(), false, getClientEvent(eei));
} else {
Object value = null;
byte[] deltaBytes = null;
boolean isObject = false;
if (callbackArg) {
deltaBytes = (byte[]) eei.getNewValue();
} else {
value = eei.getNewValue();
isObject = true;
}
getDataRegion(event.getRegion()).basicBridgePut(event.getKey(), value, deltaBytes, isObject, eei.getRawCallbackArgument(), eei.getContext(), false, getClientEvent(eei));
}
}
}
The GatewaySenderEventImpl represents an event being sent between two DistributedSystems. It needs to be modified to be able to store the delta bytes in the sending DistributedSystem, and the GatewayReceiverCommand should be modified to be able to apply those delta bytes in the receiving DistributedSystem.
In addition, the sending DistributedSystem currently has no knowledge of the state of the objects in the receiving DistributedSystem. This has to be changed so that the sending DistributedSystem knows when it must send the full bytes rather than the delta bytes in the case where the receiving DistributedSystem doesn’t have the full object.
One potential way to do this is to modify the AbstractGatewaySenderEventProcessor. The AbstractGatewaySenderEventProcessor creates GatewaySenderEventImpls
, builds batches of these and causes them to be sent to the receiving DistributedSystem. It could be modified to track which objects in the receiving DistributedSystem require full object bytes rather than the delta bytes. This can be done by tracking the time when the connection to the receiving DistributedSystem was made and also the last time the full object bytes were sent for each entry. If the entry time is before the connection time, the full bytes would be resent; otherwise the delta bytes would be sent. From the sending DistributedSystem’s perspective, if no connection can be made to any server in the receiving DistributedSystem, it is down. When it comes back up (when the sending DistributedSystem can connect to it), it could potentially be a brand new DistributedSystem. The sending DistributedSystem would have no knowledge of this, so the full bytes would have to be sent.
Another potential way to do this is to modify the GatewayAck and GatewaySenderEventRemoteDispatcher. The GatewayAck
is the acknowledgement returned from the receiving DistributedSystem for each batch of GatewaySenderEventImpls
. The GatewaySenderEventRemoteDispatcher
process the GatewayAcks
. The GatewayAck
currently contains among other fields a collection of exceptions that occur while processing the batch on the receiving DistributedSystem. The collection could be modified to contain an InvalidDeltaException
for each entry that doesn’t exist on the remote DistributedSystem. For each one, the GatewaySenderEventRemoteDispatcher
in the sending DistributedSystem could be modified to create and enqueue a GatewaySenderEventImpl with the full bytes.