Skip to main content

java Spring Shard datasource with Mysql/Oracle

If you are implementing database sharding and using Spring JDBC then you are out of luck to using declarative transactions and find a Datasource with Spring that would handle sharding. I had to implement my own Datasource manager and own annotations to use declarative kind of transactions to hide complexities from average developers.  Its very important to abstract out cross cutting concerns as sharding and transactions so that any junior developers wont be confused and start copying code left and right without understanding the impact of their changes globally. 

So the idea is that
1) You would implement a ShardDataSourceManager that would be basically pool of connection pools and you would lookup a datasource by shard id.
2)You would define your own Transactional annotations and annotate methods with it
3) You need to write an interceptor at dao layer that would read annotations on method and some context info. From the context info you would lookup shard id and lookup datasource and inject into a thread local.
4)The dao layer when it looks up datasource would look into thread local to construct a jdbc template and execute queries on it.

Here is a sample ShardDataSourceManager, ShardTransactional Annotation

public @interface ShardTransactional {
      public abstract boolean readOnly() default false;

}

public class ShardTransactionInterceptor implements MethodInterceptor {
    private static final AppLogger logger = AppLogger.getLogger(ShardTransactionInterceptor.class);
    private static ThreadLocal dataSourceThreadLocal = new ThreadLocal();
    private ShardDataSourceManager shardDataSourceManager;
   
    public ShardDataSourceManager getShardDataSourceManager() {
        return shardDataSourceManager;
    }

    public void setShardDataSourceManager(ShardDataSourceManager shardDataSourceManager) {
        this.shardDataSourceManager = shardDataSourceManager;
    }


    @Override
    public Object invoke(final MethodInvocation method) throws Throwable {
        if (method.getMethod().isAnnotationPresent(ShardTransactional.class)) {
            try {
                ShardTransactional annotation = method.getMethod().getAnnotation(ShardTransactional.class);
                User user = getParam(method, User.class);
                if (user == null) {
                    throw new IllegalStateException("All transactional methods must have user argument");
                }
                TransactionTemplate transactionTemplate = new TransactionTemplate();
                boolean readOnly = annotation.readOnly();
                transactionTemplate.setReadOnly(readOnly);
                ShardInfo shardInfo =  getShardInfo(user);
                transactionTemplate.setName("ShardTransaction");
                transactionTemplate.setTransactionManager(shardDataSourceManager.getTransactionManagerByHostId(shardInfo.getHostId(), readOnly));
                cacheDataSourceInThreadLocal(shardInfo.getHostId(),readOnly);
                return transactionTemplate.execute(new TransactionCallback() {
                    @Override
                    public Object doInTransaction(TransactionStatus transactionStatus) {
                        try {
                            return method.proceed();
                        }catch (Throwable t) {
                            transactionStatus.setRollbackOnly();
                            logger.error("Rolling back transaction due to" ,t);
                            throw new RuntimeException(t);                       
                        }
                    }
                });
            } finally {
                dataSourceThreadLocal.set(null);
            }
        } else {
            return method.proceed();
        }
    }

    private ShardInfo getShardInfo(User user) {
        ...code to lookup shard by user   
        return shardInfo;
    }

    public static DataSource getDataSource() {
        return dataSourceThreadLocal.get();
    }
   
    private DataSource cacheDataSourceInThreadLocal(int hostId, boolean readOnly) {
        DataSource datasource = shardDataSourceManager.getDataSourceByHostId(hostId, readOnly);
        dataSourceThreadLocal.set(datasource);
        return datasource;
    }

    private T getParam(MethodInvocation method, Class clazz) {
        Method reflectMethod = method.getMethod();
        Class[] parameterTypes = reflectMethod.getParameterTypes();
        if (parameterTypes != null) {
            int i=0;
            boolean found = false;
            for (Class parameterType : parameterTypes) {
                if(clazz.isAssignableFrom(parameterType)) {
                    found = true;
                    break;
                }
                i++;
            }
            if (found) {
                T param = (T) method.getArguments()[i];
                return param;
            }
        }
        return null;
    }
}


public class ShardDataSourceManager {
   
    private static final AppLogger logger = AppLogger.getLogger(ShardDataSourceManager.class);
    private static boolean autoCommit = false;
   
    private Map dataSourceMap = new HashMap();

    private Map transactionManagerMap = new HashMap();

    private ShardManager shardManager;

    private String driverClassName = "org.gjt.mm.mysql.Driver";

    private int maxActive = 20;

    private int maxIdle = 5;

    private int maxWait = 180000;
   
    private int minEvictableIdleTimeMillis = 300000;
   
    private boolean testWhileIdle = true;

    private String validationQuery = "select 1 from dual";
   
    private String userName;

    private String userPassword;

    public String getDriverClassName() {
        return driverClassName;
    }

    public void setDriverClassName(String driverClassName) {
        this.driverClassName = driverClassName;
    }

    public int getMaxActive() {
        return maxActive;
    }

    public void setMaxActive(int maxActive) {
        this.maxActive = maxActive;
    }

    public int getMaxIdle() {
        return maxIdle;
    }

    public void setMaxIdle(int maxIdle) {
        this.maxIdle = maxIdle;
    }

    public int getMaxWait() {
        return maxWait;
    }

    public void setMaxWait(int maxWait) {
        this.maxWait = maxWait;
    }

    public int getMinEvictableIdleTimeMillis() {
        return minEvictableIdleTimeMillis;
    }

    public void setMinEvictableIdleTimeMillis(int minEvictableIdleTimeMillis) {
        this.minEvictableIdleTimeMillis = minEvictableIdleTimeMillis;
    }

    public boolean isTestWhileIdle() {
        return testWhileIdle;
    }

    public void setTestWhileIdle(boolean testWhileIdle) {
        this.testWhileIdle = testWhileIdle;
    }

    public String getValidationQuery() {
        return validationQuery;
    }

    public void setValidationQuery(String validationQuery) {
        this.validationQuery = validationQuery;
    }

    public String getUserPassword() {
        return userPassword;
    }

    public void setUserPassword(String userPassword) {
        this.userPassword = userPassword;
    }

    public String getUserName() {
        return userName;
    }

    public void setUserName(String userName) {
        this.userName = userName;
    }

    public void init() throws Exception {
        for (DbHost shardInfo : shardManager.getDbHosts()) {
            String url = "jdbc:mysql://" + shardInfo.getMasterHost();
            BasicDataSource dataSource = createDataSource(url, username);
            dataSourceMap.put(shardInfo.getHostId(), dataSource);
            DataSourceTransactionManager masterTransactionManager = new DataSourceTransactionManager(dataSource);
            transactionManagerMap.put(shardInfo.getHostId(), masterTransactionManager);
            logger.info("DataSource Created for hostid= {}, url= {}", shardInfo.getHostId(), dataSource.getUrl());
        }
    }

    private BasicDataSource createDataSource(String url, String username) {
        logger.info("Initing {} ", url);
        logger.info("Creating Datasource {}", url);
        BasicDataSource dataSource = new BasicDataSource();
        dataSource.setUrl(url);
        dataSource.setUsername(username);
        dataSource.setPassword(userPassword);
        dataSource.setValidationQuery(validationQuery);
        dataSource.setTestWhileIdle(true);
        dataSource.setConnectionProperties("useUnicode=true;characterEncoding=utf8");
        dataSource.setDefaultAutoCommit(autoCommit);
        dataSource.setMaxIdle(maxIdle);
        dataSource.setMaxWait(maxWait);
        dataSource.setMaxActive(maxActive);
        return dataSource;
    }

    private DataSource getDataSourceByHostId(int hostId) {
        DataSource dataSource = dataSourceMap.get(hostId);
        if (dataSource == null) {
            logger.warn("Could not find a data source for: {}", hostId);
            throw new IllegalArgumentException("Invalid dbname, no such pool configured: " + hostId);
        }
        return dataSource;
    }

    public DataSource getDataSourceByHostId(int hostId, boolean readOnly) {
        DataSource dataSource = null;
        if (dataSource == null) {
            logger.debug("Using Master datasource for hostid={}", hostId);
            dataSource = dataSourceMap.get(hostId);
        }
        if (dataSource == null) {
            String msg = "Could not find a data source for hostId=" + hostId;
            throw new IllegalArgumentException(msg);
        }
        return dataSource;
    }

    public DataSourceTransactionManager getTransactionManagerByHostId(int hostId, boolean readOnly) {
        DataSourceTransactionManager transactionManager = null;
        if (transactionManager == null) {
            logger.debug("Using Master transactionmanager for hostid={}", hostId);
            transactionManager = transactionManagerMap.get(hostId);
        }
        if (transactionManager == null) {
            String msg = "Could not find a data source for hostId=" + hostId;
            throw new IllegalArgumentException(msg);
        }
        return transactionManager;
    }

    public void destroy() throws Exception {
        logger.info("destroying pools");
        destroyPool(dataSourceMap);
        transactionManagerMap.clear();
    }

    private void destroyPool(Map dsMap) throws SQLException {
        if (dsMap != null) {
            for (BasicDataSource dataSource : dsMap.values()) {
                logger.info("Discarding pools: {}", dataSource);
                dataSource.close();
            }
        }
    }
}

Comments

  1. Seems like a neat solution. However as I observed sharding eventaully becomes much more than just inserts in a "shard-aware" connection pool. Cross-shard queries, transaction consistency and administration of the entire array - are crucial to have a a good sharding solution. You can have a look at ScaleBase (disclaimer: I work there), http://www.scalebase.com, to see how a this can be your 1-stop-shop for all of your sharding needs, totally transparent (standard conn pool... :) ).

    ReplyDelete
  2. Can I get the source code for this to play with?

    ReplyDelete
  3. except the imports the code pasted above is the real source code we have live in production serving 1B+ rows from 20 mysql servers. I havent got a chance to put it on github yet.

    ReplyDelete
  4. Any github project ? looks nice, i'm doing similar stuff and i'd like to fork and contribute if possible

    ReplyDelete
    Replies
    1. No github project right now :( as I got busy.

      Delete

Post a Comment

Popular posts from this blog

RabbitMQ java clients for beginners

Here is a sample of a consumer and producer example for RabbitMQ. The steps are
Download ErlangDownload Rabbit MQ ServerDownload Rabbit MQ Java client jarsCompile and run the below two class and you are done.
This sample create a Durable Exchange, Queue and a Message. You will have to start the consumer first before you start the for the first time.

For more information on AMQP, Exchanges, Queues, read this excellent tutorial
http://blogs.digitar.com/jjww/2009/01/rabbits-and-warrens/

+++++++++++++++++RabbitMQProducer.java+++++++++++++++++++++++++++
import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; import com.rabbitmq.client.*; public class RabbitMQProducer { public static void main(String []args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setUsername("guest"); factory.setPassword("guest"); factory.setVirtualHost("/"); factory.setHost("127.0.0.1"); factory.setPort(5672); Conne…

Logging to Graphite monitoring tool from java

We use Graphite as a tool for monitoring some stats and watch trends. A requirement is to monitor impact of new releases as build is deployed to app nodes to see if things like
1) Has the memcache usage increased.
2) Has the no of Java exceptions went up.
3) Is the app using more tomcat threads.
Here is a screenshot

We changed the installer to log a deploy event when a new build is deployed. I wrote a simple spring bean to log graphite events using java. Logging to graphite is easy, all you need to do is open a socket and send lines of events.
import org.slf4j.Logger;import org.slf4j.LoggerFactory; import java.io.OutputStreamWriter; import java.io.Writer; import java.net.Socket; import java.util.HashMap; import java.util.Map; public class GraphiteLogger { private static final Logger logger = LoggerFactory.getLogger(GraphiteLogger.class); private String graphiteHost; private int graphitePort; public String getGraphiteHost() { return graphiteHost; } public void setGraphite…

Jersey posting multipart data

This took me sometime to figure out mostly it was because I was only including jersey-multipart-1.6.jar but I was not including mimepull-1.3.jar.

So the intent is to upload a file using REST api and we need pass meta attributes in addition to uploading the file. Also the intent is to stream the file instead of first storing it on the local disk. Here is some sample code.
@Path("/upload-service") public class UploadService { @Context protected HttpServletResponse response; @Context protected HttpServletRequest request; @POST @Consumes(MediaType.MULTIPART_FORM_DATA) @Produces(MediaType.APPLICATION_JSON) public String uploadFile(@PathParam("fileName") final String fileName, @FormDataParam("workgroupId") String workgroupId, @FormDataParam("userId") final int userId, @FormDataParam("content") final InputStream content) throws JSONException { //.......Upload the file to S3 or netapp or any storage service } }
Now to tes…