Synchronization in Distributed Task Processing: Leveraging Distributed Locks

Synchronization in Distributed Task Processing: Leveraging Distributed Locks


0






Rahul Kumar (@rahul)

We live in an era where a single machine/process is not enough to fulfil all requirements. Most of the time we need multiple machines to perform the job. When a job is processed by multiple distributed processes, then often needs coordination and synchronisation at a point. We are going to leverage the concept and implementation of distributed lock.

Prerequisites

  • Java - Code example is provided in java
  • Zookeeper - We'll be using a zookeeper to implement distributed lock

Why do we need distributed locks?

  • Resource Management :- To prevent contention and potential race conditions in a distributed system
  • Distributed Task Scheduling :- To make sure that one task is executed by only one node in a distributed system.
  • Leader Election :- To elect a leader in the distributed system who is responsible for making critical decisions and managing shared resources.
  • Distributed Transaction :- To ensure that multiple resources or databases participate in a transaction in a coordinated and mutually exclusive manner.
  • Cache consistency :- To coordinate cache updates and prevent multiple nodes from updating the same cache simultaneously.

What is ZooKeeper?

I am assuming that you know Java and Zookeeper, in case if you don’t know please refer to the official documentation. I'll cover the important parts of ZooKeeper which is required for this article.

ZooKeeper has a hierarchal namespace, much like a distributed file system. Each node in ZooKeeper can have children, just like directories can have files. Each node in ZooKeeper is referred to as znode.

ZooKeeper has a notion of Ephemeral nodes, these nodes exist as long as the session created these nodes are alive.

ZooKeeper has a notion of sequence nodes. When creating sequence nodes, ZooKeeper appends a monotonically increasing counter at the end of the path. The counter is unique to the parent node.

ZooKeeper watches are one-time triggers sent to the client that set the watch. A watch set on /znode-xx will be triggered when /znode-xx changes.

A Simple Cloud Storage Service

Imagine a cloud storage service that allows clients to upload/download/edit files. Multiple clients may need to access and modify the same file concurrently. However, to ensure data consistency and avoid conflicts, we need to coordinate access to the files and prevent multiple clients from modifying the same file simultaneously.

  • File upload/edit - When a client wants to upload a file to the cloud storage service, it needs to acquire a distributed lock associated with the file being uploaded. The lock ensures that only one client can modify/upload the file at a time to avoid conflict and data corruption.
  • File download - Similarly, when a client wants to download a file it needs to acquire the lock first to ensure that no other clients are modifying the file at the same time.
  • File deletion - distributed lock can be used to ensure that no clients are accessing the file while the file is being deleted.

By using distributed locks, the cloud storage service can maintain data integrity and ensure that multiple clients can safely access and modify files without causing conflicts.

Implemention

Suppose a client wants to modify a file named abc.json, then he has to follow the following steps.

  • Acquire the lock
  • Modify the file
  • Release the lock

In ZooKeeper we can use znode for providing distributed lock on any resource.

Any client who wants to acquire a lock on abc.json will have to first create a znode with a path /abc.json if it doesn’t exist already. Since it also needs to acquire the lock it'll create an ephemeral child node with sequence flag in abc.json.

With the sequence flag, ZooKeeper automatically appends a sequence number that is greater than anyone previously appended to a child of /abc.json. The process that created the znode with the smallest appended sequence number will be able to acquire the lock.

The process that wants to release the lock will delete the znode created by that process.

The next process with the next smallest sequence number will acquire the lock, and so on...

Suppose we have four clients. Below are ephemeral nodes created by each client.

       /abc.json
    - /0000000000 // created by client 3 
    - /0000000001 // created by client 2
    - /0000000002 // created by client 1
    - /0000000003 // created by client 4

    

Note: we are using ephemeral nodes, because ephemeral nodes are deleted by ZooKeeper when the client disconnected.

Lock Acquisition

Client 3 will be able to acquire the lock on /abc.json as its sequence number if the smallest among all the other nodes.

Once the process is finished client 3 will delete znode with a sequence number of 0000000000. Now, client 2 has the smallest sequence number and it is allowed to acquire the lock.

Similarly, clients 1 and 4 will be provided with the lock on the shared resource sequentially.

Let's create a Watcher class because every ZooKeeper instance needs a Watcher instance.

      package org.example.lock;

import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;

public class RootWatcher implements Watcher {
    @Override
    public void process(WatchedEvent watchedEvent) {
        // noop
    }
}

    

We'll create a Executor class which acts like a ZooKeeper client. We will have multiple instances of Executor running concurrently on separate threads. All of them will try to acquire a lock on the same resource and process them.

      public class Executor implements Runnable {
    private final String connectionString;
    private final int connectionTimeout;
    private final String LOCK_PATH;
    private String lockPath;
    ZooKeeper zk;

    Executor(String connectionString, int connectionTimeout, String lockPath) {
        this.connectionString = connectionString;
        this.connectionTimeout = connectionTimeout;
        this.LOCK_PATH = lockPath;
    }

    @Override
    public void run() {
         // noop
    }
}

    

There will be a main class responsible for creating instances of executors and starting them.

      package org.example.lock;

import ch.qos.logback.classic.Level;
import ch.qos.logback.classic.Logger;
import org.slf4j.LoggerFactory;

public class LockDemo {
    static Logger logger = (Logger) LoggerFactory.getLogger(Logger.ROOT_LOGGER_NAME);
    public static void main(String[] args) {
        logger.setLevel(Level.ERROR);
        String connectionString="localhost:2181";
        int connectionTimeout = 3000;
        String lockPath = "/distributed-lock.json";

        int executorsCount = 10;

        for(int i=0; i<executorsCount; i++){
            Executor executor = new Executor(connectionString,connectionTimeout,lockPath);
            new Thread(executor).start();
        }
    }
}

    

LockDemo class is creating 10 executors and starting them.

We have a Zk class which will be creating ZooKeeper connections

      package org.example.lock;

import org.apache.zookeeper.ZooKeeper;
import java.io.IOException;

public class Zk {
    private final String connectionString;
    private final int connectionTimeout;

    Zk(String connectionString, int connectionTimeout){
        this.connectionString = connectionString;
        this.connectionTimeout = connectionTimeout;
    }

    ZooKeeper createZkClient() throws IOException {
        RootWatcher watcher = new RootWatcher();
        return new ZooKeeper(connectionString, connectionTimeout, watcher);
    }
}
    

Now, we have all the classes required for this article. We can start implementing them.

The executor will first create a ZooKeeper connection.

In ZooKeeper, you can not create a child without a parent. For example, you can not create a node named /locks/L0 if /locks node doesn’t exist.

So, the very first thing that the executor will do is to create a lock node, i.e. parent.

      public class Executor implements Runnable {
    private final String connectionString;
    private final int connectionTimeout;
    private final String LOCK_PATH;
    private String lockPath;
    ZooKeeper zk;

    Executor(String connectionString, int connectionTimeout, String lockPath) {
        this.connectionString = connectionString;
        this.connectionTimeout = connectionTimeout;
        this.LOCK_PATH = lockPath;
    }

    public void createLockNode() throws InterruptedException, KeeperException {
        if (this.zk.exists(LOCK_PATH, false) != null) {
            return;
        }    

        try{
            this.zk.create(LOCK_PATH, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        }catch (Exception ex){
            // noop - node already exist, it might have been created by other node
            // there could be other error too,  but for now let's do not think about them
        }
    }

    @Override
    public void run() {
        try {
            // create zookeeper client
            this.zk = new Zk(connectionString, connectionTimeout)
                    .createZkClient();

            // create a lock node if it does not exist already
            this.createLockNode();
        } catch (IOException | InterruptedException | KeeperException e) {
            e.printStackTrace();
            throw new RuntimeException(e);
        }
    }
}

    

After creating the parent node, each executor will start processing his tasks. Each executor has 10 tasks.

          public void doProcessing() throws InterruptedException, KeeperException {
        for (var task = 0; task < 10; task++) {
            doTask(task);
        }
    }

    

doTask method will first acquire the lock, then perform the task and finally release the lock.

          boolean acquireLock() throws InterruptedException, KeeperException {
        if(this.lockPath == null) {
            this.lockPath = this.zk.create(LOCK_PATH + "/", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
        }

        List<String> children = zk.getChildren(LOCK_PATH,false);
        Collections.sort(children);
        String smallestChild = children.get(0);
        return this.lockPath.endsWith(smallestChild);
    }

    void releaseLock() throws InterruptedException, KeeperException {
        zk.delete(this.lockPath, -1);
        this.lockPath = null;
    }

    public void doTask(int task) throws InterruptedException, KeeperException {
        long oneSecond = Duration
                .ofSeconds(1)
                .toMillis();

        while (!this.acquireLock()) {
            Thread.sleep(oneSecond);
        }

        System.out.printf("Lock acquired for task %d by thread %s\n ",task,Thread.currentThread().getName());
        Thread.sleep(oneSecond); // busy processing simulation
        this.releaseLock();
        System.out.printf("Lock released for task %d by thread %s\n ",task,Thread.currentThread().getName());
    }

    

Complete Executorclass implementation

      package org.example.lock;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;

import java.io.IOException;
import java.time.Duration;
import java.util.Collections;
import java.util.List;

public class Executor implements Runnable {
    private final String connectionString;
    private final int connectionTimeout;
    private final String LOCK_PATH;
    private String lockPath;
    ZooKeeper zk;

    Executor(String connectionString, int connectionTimeout, String lockPath) {
        this.connectionString = connectionString;
        this.connectionTimeout = connectionTimeout;
        this.LOCK_PATH = lockPath;
    }

    public void createLockNode() throws InterruptedException, KeeperException {
        if (zk.exists(LOCK_PATH, false) != null) {
            return;
        }

        try{
            zk.create(LOCK_PATH, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        }catch (Exception ex){
            // noop - node already exist
        }
    }

    boolean acquireLock() throws InterruptedException, KeeperException {
        if(this.lockPath == null) {
            this.lockPath = this.zk.create(LOCK_PATH + "/", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
        }

        List<String> children = zk.getChildren(LOCK_PATH,false);
        Collections.sort(children);
        String smallestChild = children.get(0);
        return this.lockPath.endsWith(smallestChild);
    }

    void releaseLock() throws InterruptedException, KeeperException {
        zk.delete(this.lockPath, -1);
        this.lockPath = null;
    }

    public void doTask(int task) throws InterruptedException, KeeperException {
        long oneSecond = Duration
                .ofSeconds(1)
                .toMillis();

        while (!this.acquireLock()) {
            Thread.sleep(oneSecond);
        }

        System.out.printf("Lock acquired for task %d by thread %s\n ",task,Thread.currentThread().getName());
        Thread.sleep(oneSecond); // busy processing simulation
        this.releaseLock();
        System.out.printf("Lock released for task %d by thread %s\n ",task,Thread.currentThread().getName());
    }

    public void doProcessing() throws InterruptedException, KeeperException {
        for (var task = 0; task < 10; task++) {
            doTask(task);
        }
    }

    @Override
    public void run() {
        try {
            // create zookeeper client
            zk = new Zk(connectionString, connectionTimeout)
                    .createZkClient();

            // create a lock node if it does not exist already
            this.createLockNode();
        } catch (IOException | InterruptedException | KeeperException e) {
            throw new RuntimeException(e);
        }

        try {
            doProcessing();
        } catch (InterruptedException | KeeperException e) {
            throw new RuntimeException(e);
        }
    }
}

    

Our distributed lock is ready and here is the output.

      Lock acquired for task 0 by thread Thread-9
 Lock released for task 0 by thread Thread-9
 Lock acquired for task 0 by thread Thread-7
 Lock released for task 0 by thread Thread-7
 Lock acquired for task 0 by thread Thread-1
.............
.........
...

    

The above design has one issue. We are calling zk.getChildren(...) inside acquireLock unnecessarily in a loop. When you have a large number of clients waiting for the same resource, this will brust ZooKeeper.

To avoid a large number of calls to zk.getChildren(...), we can use ZooKeeper watches. The idea is, we'll set a watch on the node whose sequence number is just small to the current node and only if the current node is not the smallest.

You can check this commit to get an idea of how this can be implemented.

Add a thoughtful comment...

✨ Explore more tech insights and coding wonders with @dsabyte! Your journey in innovation has just begun. Keep learning, keep sharing, and let's continue to code a brighter future together. Happy exploring! 🚀❤️

  • #java
  • #synchronization
  • #distributed-system
  • #distributed-lock
  • #zookeeper
  • #apache-zookeeper

Subscribe to our newsletter.

Join the "News Later" community by entering your email. It's quick, it's easy, and it's your key to unlocking future tech revelations.

Weekly Updates

Every week, we curate and deliver a collection of articles, blogs and chapters directly to your inbox. Stay informed, stay inspired, and stay ahead in the fast-paced world of technology.

No spam

Rest assured, we won't clutter your inbox with unnecessary emails. No spam, only meaningful insights, and valuable content designed to elevate your tech experience.

© 2023 @dsabyte. All rights reserved