пятница, июля 16, 2010

Apache Cassandra и YAML

Иногда бывает необходимо быстро загрузить в базу или выгрузить из базы небольшой объем данных в файл. Например, строки для интернационализации приложения. В последнее время в качестве контейнера для сериализованных данных активно используется YAML. Андрей Сомов делает замечательный проект SnakeYAML для использования YAML в java. Там реализован лаконичный, с возможностями настройки, (де)сериализатор бинов, коллекций и всего прочего. Именно им я пользуюсь в случае работы с YAML.
Напомню, что любой key-value DB, и Кассандра в частности является ни чем иным, как Map<String,Map<String,String>> при строковом ключе. Это вполне позволяет сделать универсальный загрузчик данных, который и будет представлен далее.

Интерфейс
/*
 * Copyright 2010 the original author or authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package com.applusion.cassadraspring;

import static me.prettyprint.cassandra.utils.StringUtils.bytes;
import static me.prettyprint.cassandra.utils.StringUtils.string;
import java.text.MessageFormat;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import me.prettyprint.cassandra.dao.SpringCommand;
import me.prettyprint.cassandra.service.CassandraClient;
import me.prettyprint.cassandra.service.CassandraClientPool;
import me.prettyprint.cassandra.service.Keyspace;
import org.apache.cassandra.thrift.Column;
import org.apache.cassandra.thrift.ColumnParent;
import org.apache.cassandra.thrift.ColumnPath;
import org.apache.cassandra.thrift.ConsistencyLevel;
import org.apache.cassandra.thrift.KeyRange;
import org.apache.cassandra.thrift.NotFoundException;
import org.apache.cassandra.thrift.SlicePredicate;
import org.apache.cassandra.thrift.SliceRange;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.support.AbstractMessageSource;

/**
 * MessageSource implementation that reads the messages from a Apache Cassandra.
 * 
 * @author Oleg Smith
 */

public class CassandraResourceBundleMessageSource extends AbstractMessageSource
  implements InitializingBean {

 private CassandraClientPool cassandraClientPool;
 private String keyspace = "Keyspace1";
 private String columnFamilyName = "Standard2";
 private ConsistencyLevel consistencyLevel = CassandraClient.DEFAULT_CONSISTENCY_LEVEL;

 /**
  * Local cache. Universal Map<String, Map<String, String>>
  */
 private Map<String, Map<String, String>> cache = new HashMap<String, Map<String, String>>();

 public CassandraResourceBundleMessageSource() {
 }

 /**
  * resolveCode implementation
  */

 @Override
 protected MessageFormat resolveCode(String code, Locale locale) {
  MessageFormat codeInCache;
  if ((codeInCache = resolveCodeInCache(code, locale)) != null) {
   return codeInCache;
  } else {
   return resolveCodeInCassandra(code, locale);
  }
 }

 /**
  * Resolve code in local cache.
  */

 private MessageFormat resolveCodeInCache(String code, Locale locale) {
  Map<String, String> cacheValues = cache.get(code);
  if (cacheValues != null) {
   String cacheValue = cacheValues.get(locale.getLanguage());
   if (cacheValue != null)
    return new MessageFormat(cacheValue, locale);
   cacheValue = cacheValues.get("en");
   if (cacheValue != null)
    return new MessageFormat(cacheValue, locale);
  }
  return null;
 }

 /**
  * Resolve code in Cassandra.
  */

 private MessageFormat resolveCodeInCassandra(String code, Locale locale) {
  List<Column> columns;
  try {
   columns = getColumnsByKey(code);
   Map<String, String> cacheValues = new HashMap<String, String>();
   for (Column columni : columns) {
    cacheValues.put(string(columni.name), string(columni.value));
   }

   cache.put(code, cacheValues);

   if (cacheValues.get(locale.getLanguage()) != null)
    return new MessageFormat(cacheValues.get(locale.getLanguage()),
      locale);
   if (cacheValues.get("en") != null)
    return new MessageFormat(cacheValues.get("en"), locale);

  } catch (Exception e) {
   e.printStackTrace();
  }
  ;
  return null;
 }

 @Override
 public void afterPropertiesSet() throws Exception {
  loadAll();
 }

 /**
  * Load all strings from Cassandra repeatedly.
  */

 private void loadAll() {
  Map<String, List<Column>> keys;
  try {
   keys = getColumnsByKeys("", 1000);
   boolean isFirstIteration = true;

   while (keys.size() != 0 && keys.size() != 1) {
    Iterator<String> it = keys.keySet().iterator();
    String key = null;
    if (!isFirstIteration)
     it.next();
    while (it.hasNext()) {
     key = it.next();
     List<Column> columns = (List<Column>) keys.get(key);
     Map<String, String> columnsValues = new HashMap<String, String>();
     for (Column columni : columns) {
      columnsValues.put(string(columni.name),
        string(columni.value));
     }
     cache.put(key, columnsValues);
    }
    if (key != null)
     keys = getColumnsByKeys(key, 1000);
    isFirstIteration = false;
   }

  } catch (Exception e) {
   e.printStackTrace();
  }
 }

 /**
  * Get 1500 columns for keyCount keys
  */

 private Map<String, List<Column>> getColumnsByKeys(final String keyStart,
   final int keyCount) throws Exception {
  return execute(new SpringCommand<Map<String, List<Column>>>(
    cassandraClientPool) {
   public Map<String, List<Column>> execute(final Keyspace ks)
     throws Exception {
    ColumnParent clp = new ColumnParent(columnFamilyName);
    SliceRange sr = new SliceRange(new byte[0], new byte[0], false,
      1500);
    SlicePredicate sp = new SlicePredicate();
    sp.setSlice_range(sr);

    KeyRange range = new KeyRange(keyCount);
    range.setStart_key(keyStart);
    range.setEnd_key("");

    Map<String, List<Column>> keys = ks.getRangeSlices(clp, sp,
      range);
    return keys;
   }
  });
 }

 /**
  * Get 1500 columns from given key
  */

 public List<Column> getColumnsByKey(final String key) throws Exception {
  return execute(new SpringCommand<List<Column>>(cassandraClientPool) {
   public List<Column> execute(final Keyspace ks) throws Exception {
    try {
     ColumnParent clp = new ColumnParent(columnFamilyName);
     SliceRange sr = new SliceRange(new byte[0], new byte[0],
       false, 1500);
     SlicePredicate sp = new SlicePredicate();
     sp.setSlice_range(sr);
     List<Column> cols = ks.getSlice(key, clp, sp);

     return cols;
    } catch (NotFoundException e) {
     return null;
    }
   }
  });
 }

 protected ColumnPath createColumnPath(String columnName) {
  return new ColumnPath(columnFamilyName).setColumn(bytes(columnName));
 }

 protected <T> T execute(SpringCommand<T> command) throws Exception {
  CassandraClient c = cassandraClientPool.borrowClient();
  Keyspace ks = c.getKeyspace(keyspace, consistencyLevel);
  try {
   return command.execute(ks);
  } finally {
   cassandraClientPool.releaseClient(ks.getClient());
  }
 }

 public CassandraClientPool getCassandraClientPool() {
  return cassandraClientPool;
 }

 public void setCassandraClientPool(CassandraClientPool cassandraClientPool) {
  this.cassandraClientPool = cassandraClientPool;
 }

 public String getKeyspace() {
  return keyspace;
 }

 public void setKeyspace(String keyspace) {
  this.keyspace = keyspace;
 }

 public String getColumnFamilyName() {
  return columnFamilyName;
 }

 public void setColumnFamilyName(String columnFamilyName) {
  this.columnFamilyName = columnFamilyName;
 }

}

Настройка Spring
<bean id="backup" class="com.applusion.cassadraspring.CassandraYAMLLoader">
  <property name="cassandraClientPool"  ref="cassandraClientPool"/>
  <property name="keyspace" ref="keyspace"/> 
 </bean>

<bean id="keyspace" class="java.lang.String">
  <constructor-arg><value>Keyspace1</value></constructor-arg>
 </bean>

<bean id="cassandraClientMonitor" class="me.prettyprint.cassandra.service.CassandraClientMonitor"/>

    <bean id="jmxMonitor" class="me.prettyprint.cassandra.service.JmxMonitor" factory-method="getInstance"/>

    <bean id="cassandraClientPoolFactory" class="me.prettyprint.cassandra.service.CassandraClientPoolFactory" factory-method="getInstance"/>

    <bean id="cassandraClientPool" factory-bean="cassandraClientPoolFactory" factory-method="createNew" >
        <constructor-arg><ref bean="cassandraHostConfigurator"/></constructor-arg>
    </bean>

    <bean id="cassandraHostConfigurator" class="me.prettyprint.cassandra.service.CassandraHostConfigurator">
        <constructor-arg value="localhost:9160"/>
    </bean>

Ну все, теперь остается только вызвать методы инжектированного backup. Например backup.LoadYAML("dbyaml/yaml.yml", "i18n") загрузит данные из файла dbyaml/yaml.yml в ColumnFamily i18n в Keyspace1, а backup.SaveYAML("dbyaml/yaml.yml", "i18n") сохранит.

формат yml будет какой-то такой:
cancel: {en: Cancel, ru: Отмена}
save: {en: Save, ru: Сохранить}
from: {en: From, ru: От}
password: {en: Password, ru: Пароль}
time: {en: Time, ru: Время}
timezone: {en: TimeZone, ru: Часовой пояс}
more: {en: Next 20, ru: Следующие 20}
search: {en: Search, ru: Поиск}

1 комментарий:

Andrey Somov комментирует...

Приятно видеть, что SnakeYAML работает.

Удачи,
Андрей Сомов

Мой список блогов