Conditional Writer
Accumulo Tour: Conditional Writer
Tour page 11 of 13
Suppose the Gotham PD is storing home addresses for persons of interest in Accumulo. We want to correctly handle the case of multiple users editing the same address at the same time. The following sequence of events shows an example of how this can go wrong.
- User 0 sets the key
id0001:location:home
to1007 Mountain Drive, Gotham, New York
- User 1 reads
id0001:location:home
- User 2 reads
id0001:location:home
- User 1 replaces
Drive
withDr
- User 2 replaces
New York
withNY
- User 1 sets key
id0001:location:home
to1007 Mountain Dr, Gotham, New York
- User 2 sets key
id0001:location:home
to1007 Mountain Drive, Gotham, NY
In this situation the changes made by User 1 are lost, ending up with 1007
Mountain Drive, Gotham, NY
instead of 1007 Mountain Dr, Gotham, New York
. To
correctly handle this, Accumulo offers the ConditionalWriter. The
ConditionalWriter atomically checks conditions on a row and only applies a
mutation when all conditions are satisfied.
Exercise
The following code simulates the concurrency in the situation above. The code starts multiple threads, with each thread doing the following.
- Read key’s value into memory using a scanner
- Modify the copy in memory.
- Write out the modified value from memory using a batch writer.
- If write was unsuccessful, then goto step 1.
This process can result in threads overwriting each other’s changes. The problem is the batch writer always makes the update, even when the value has changed since it was read.
To simplify, we will create several small methods to illustrate the issue.
jshell> String getAddress(AccumuloClient client, String id) {
...> try (org.apache.accumulo.core.client.Scanner scan = new IsolatedScanner(client.createScanner("GothamPD", Authorizations.EMPTY))) {
...> scan.setRange(Range.exact(id, "location", "home"));
...> for (Map.Entry<Key, Value> entry : scan) {
...> return entry.getValue().toString();
...> }
...> return null;
...> } catch (TableNotFoundException e) {
...> throw new RuntimeException(e);
...> }
...> }
| created method getAddress(AccumuloClient,String)
jshell> boolean setAddress(AccumuloClient client, String id, String expectedAddr, String newAddr) {
...> try (BatchWriter writer = client.createBatchWriter("GothamPD")) {
...> Mutation mutation = new Mutation(id);
...> mutation.put("location", "home", newAddr);
...> writer.addMutation(mutation);
...> return true;
...> } catch (Exception e) {
...> throw new RuntimeException(e);
...> }
...> }
| created method setAddress(AccumuloClient,String,String,String)
jshell> Future<Void> modifyAddress(AccumuloClient client, String id, Function<String,String> modifier) throws Exception {
...> return CompletableFuture.runAsync(() -> {
...> String currAddr, newAddr;
...> do {
...> currAddr = getAddress(client, id);
...> newAddr = modifier.apply(currAddr);
...> System.out.printf("Thread %3d attempting change %20s -> %-20s\n",
...> Thread.currentThread().getId(), "'"+currAddr+"'", "'"+newAddr+"'");
...> } while (!setAddress(client, id, currAddr, newAddr));
...> });
...> }
| created method modifyAddress(AccumuloClient,String,Function<String,String>)
jshell> void concurrent_writes() throws Exception {
...> try {
...> client.tableOperations().create("GothamPD");
...> } catch (TableExistsException e) {
...> System.out.println("GothamPD table already exists...proceeding...");
...> }
...> String id = "id0001";
...> setAddress(client, id, null, " 1007 Mountain Drive, Gotham, New York ");
...> Future<Void> future1 = modifyAddress(client, id, String::trim);
...> Future<Void> future2 = modifyAddress(client, id, addr -> addr.replace("Drive", "Dr"));
...> Future<Void> future3 = modifyAddress(client, id, addr -> addr.replace("New York", "NY"));
...> future1.get();
...> future2.get();
...> future3.get();
...> System.out.println("Final address : '" + getAddress(client, id) + "'");
...> }
| created method concurrent_writes()
The following is one of a few possible outputs. Notice that only the
modification of Drive
to Dr
shows up in the final output. The other
modifications were lost.
jshell> concurrent_writes()
GothamPD table already exists...proceeding...
Thread 52 attempting change ' 1007 Mountain Drive, Gotham, New York ' -> ' 1007 Mountain Drive, Gotham, NY '
Thread 38 attempting change ' 1007 Mountain Drive, Gotham, New York ' -> '1007 Mountain Drive, Gotham, New York'
Thread 53 attempting change ' 1007 Mountain Drive, Gotham, New York ' -> ' 1007 Mountain Dr, Gotham, New York '
Final address : ' 1007 Mountain Dr, Gotham, New York '
To fix this example, make the following changes in setAddress()
to use a
ConditionalWriter.
- Call createConditionalWriter instead of creating a batch writer
- Create a Condition for the column ‘location:home’. If
expectedAddr
is not null, then call setValue passingexpectedAddr
. IfexpectedAddr
is null, then do nothing else with the condition. A condition with no value means that column is expected to be absent. - Replace Mutation with a ConditionalMutation and pass the condition to its constructor.
- Call write passing it the conditional mutation.
- Return
true
if getStatus from the Result returned by write is ACCEPTED.