Напомню, что любой key-value DB, и Кассандра в частности является ни чем иным, как Map<String,Map<String,String>> при строковом ключе. Это вполне позволяет сделать универсальный загрузчик данных, который и будет представлен далее.
Интерфейс
/*
* Copyright 2010 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.applusion.cassadraspring;
import static me.prettyprint.cassandra.utils.StringUtils.bytes;
import static me.prettyprint.cassandra.utils.StringUtils.string;
import java.text.MessageFormat;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import me.prettyprint.cassandra.dao.SpringCommand;
import me.prettyprint.cassandra.service.CassandraClient;
import me.prettyprint.cassandra.service.CassandraClientPool;
import me.prettyprint.cassandra.service.Keyspace;
import org.apache.cassandra.thrift.Column;
import org.apache.cassandra.thrift.ColumnParent;
import org.apache.cassandra.thrift.ColumnPath;
import org.apache.cassandra.thrift.ConsistencyLevel;
import org.apache.cassandra.thrift.KeyRange;
import org.apache.cassandra.thrift.NotFoundException;
import org.apache.cassandra.thrift.SlicePredicate;
import org.apache.cassandra.thrift.SliceRange;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.support.AbstractMessageSource;
/**
* MessageSource implementation that reads the messages from a Apache Cassandra.
*
* @author Oleg Smith
*/
public class CassandraResourceBundleMessageSource extends AbstractMessageSource
implements InitializingBean {
private CassandraClientPool cassandraClientPool;
private String keyspace = "Keyspace1";
private String columnFamilyName = "Standard2";
private ConsistencyLevel consistencyLevel = CassandraClient.DEFAULT_CONSISTENCY_LEVEL;
/**
* Local cache. Universal Map<String, Map<String, String>>
*/
private Map<String, Map<String, String>> cache = new HashMap<String, Map<String, String>>();
public CassandraResourceBundleMessageSource() {
}
/**
* resolveCode implementation
*/
@Override
protected MessageFormat resolveCode(String code, Locale locale) {
MessageFormat codeInCache;
if ((codeInCache = resolveCodeInCache(code, locale)) != null) {
return codeInCache;
} else {
return resolveCodeInCassandra(code, locale);
}
}
/**
* Resolve code in local cache.
*/
private MessageFormat resolveCodeInCache(String code, Locale locale) {
Map<String, String> cacheValues = cache.get(code);
if (cacheValues != null) {
String cacheValue = cacheValues.get(locale.getLanguage());
if (cacheValue != null)
return new MessageFormat(cacheValue, locale);
cacheValue = cacheValues.get("en");
if (cacheValue != null)
return new MessageFormat(cacheValue, locale);
}
return null;
}
/**
* Resolve code in Cassandra.
*/
private MessageFormat resolveCodeInCassandra(String code, Locale locale) {
List<Column> columns;
try {
columns = getColumnsByKey(code);
Map<String, String> cacheValues = new HashMap<String, String>();
for (Column columni : columns) {
cacheValues.put(string(columni.name), string(columni.value));
}
cache.put(code, cacheValues);
if (cacheValues.get(locale.getLanguage()) != null)
return new MessageFormat(cacheValues.get(locale.getLanguage()),
locale);
if (cacheValues.get("en") != null)
return new MessageFormat(cacheValues.get("en"), locale);
} catch (Exception e) {
e.printStackTrace();
}
;
return null;
}
@Override
public void afterPropertiesSet() throws Exception {
loadAll();
}
/**
* Load all strings from Cassandra repeatedly.
*/
private void loadAll() {
Map<String, List<Column>> keys;
try {
keys = getColumnsByKeys("", 1000);
boolean isFirstIteration = true;
while (keys.size() != 0 && keys.size() != 1) {
Iterator<String> it = keys.keySet().iterator();
String key = null;
if (!isFirstIteration)
it.next();
while (it.hasNext()) {
key = it.next();
List<Column> columns = (List<Column>) keys.get(key);
Map<String, String> columnsValues = new HashMap<String, String>();
for (Column columni : columns) {
columnsValues.put(string(columni.name),
string(columni.value));
}
cache.put(key, columnsValues);
}
if (key != null)
keys = getColumnsByKeys(key, 1000);
isFirstIteration = false;
}
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* Get 1500 columns for keyCount keys
*/
private Map<String, List<Column>> getColumnsByKeys(final String keyStart,
final int keyCount) throws Exception {
return execute(new SpringCommand<Map<String, List<Column>>>(
cassandraClientPool) {
public Map<String, List<Column>> execute(final Keyspace ks)
throws Exception {
ColumnParent clp = new ColumnParent(columnFamilyName);
SliceRange sr = new SliceRange(new byte[0], new byte[0], false,
1500);
SlicePredicate sp = new SlicePredicate();
sp.setSlice_range(sr);
KeyRange range = new KeyRange(keyCount);
range.setStart_key(keyStart);
range.setEnd_key("");
Map<String, List<Column>> keys = ks.getRangeSlices(clp, sp,
range);
return keys;
}
});
}
/**
* Get 1500 columns from given key
*/
public List<Column> getColumnsByKey(final String key) throws Exception {
return execute(new SpringCommand<List<Column>>(cassandraClientPool) {
public List<Column> execute(final Keyspace ks) throws Exception {
try {
ColumnParent clp = new ColumnParent(columnFamilyName);
SliceRange sr = new SliceRange(new byte[0], new byte[0],
false, 1500);
SlicePredicate sp = new SlicePredicate();
sp.setSlice_range(sr);
List<Column> cols = ks.getSlice(key, clp, sp);
return cols;
} catch (NotFoundException e) {
return null;
}
}
});
}
protected ColumnPath createColumnPath(String columnName) {
return new ColumnPath(columnFamilyName).setColumn(bytes(columnName));
}
protected <T> T execute(SpringCommand<T> command) throws Exception {
CassandraClient c = cassandraClientPool.borrowClient();
Keyspace ks = c.getKeyspace(keyspace, consistencyLevel);
try {
return command.execute(ks);
} finally {
cassandraClientPool.releaseClient(ks.getClient());
}
}
public CassandraClientPool getCassandraClientPool() {
return cassandraClientPool;
}
public void setCassandraClientPool(CassandraClientPool cassandraClientPool) {
this.cassandraClientPool = cassandraClientPool;
}
public String getKeyspace() {
return keyspace;
}
public void setKeyspace(String keyspace) {
this.keyspace = keyspace;
}
public String getColumnFamilyName() {
return columnFamilyName;
}
public void setColumnFamilyName(String columnFamilyName) {
this.columnFamilyName = columnFamilyName;
}
}
Настройка Spring
<bean id="backup" class="com.applusion.cassadraspring.CassandraYAMLLoader">
<property name="cassandraClientPool" ref="cassandraClientPool"/>
<property name="keyspace" ref="keyspace"/>
</bean>
<bean id="keyspace" class="java.lang.String">
<constructor-arg><value>Keyspace1</value></constructor-arg>
</bean>
<bean id="cassandraClientMonitor" class="me.prettyprint.cassandra.service.CassandraClientMonitor"/>
<bean id="jmxMonitor" class="me.prettyprint.cassandra.service.JmxMonitor" factory-method="getInstance"/>
<bean id="cassandraClientPoolFactory" class="me.prettyprint.cassandra.service.CassandraClientPoolFactory" factory-method="getInstance"/>
<bean id="cassandraClientPool" factory-bean="cassandraClientPoolFactory" factory-method="createNew" >
<constructor-arg><ref bean="cassandraHostConfigurator"/></constructor-arg>
</bean>
<bean id="cassandraHostConfigurator" class="me.prettyprint.cassandra.service.CassandraHostConfigurator">
<constructor-arg value="localhost:9160"/>
</bean>
Ну все, теперь остается только вызвать методы инжектированного backup. Например backup.LoadYAML("dbyaml/yaml.yml", "i18n") загрузит данные из файла dbyaml/yaml.yml в ColumnFamily i18n в Keyspace1, а backup.SaveYAML("dbyaml/yaml.yml", "i18n") сохранит.
формат yml будет какой-то такой:
cancel: {en: Cancel, ru: Отмена}
save: {en: Save, ru: Сохранить}
from: {en: From, ru: От}
password: {en: Password, ru: Пароль}
time: {en: Time, ru: Время}
timezone: {en: TimeZone, ru: Часовой пояс}
more: {en: Next 20, ru: Следующие 20}
search: {en: Search, ru: Поиск}
1 комментарий:
Приятно видеть, что SnakeYAML работает.
Удачи,
Андрей Сомов
Отправить комментарий