文档结构  
翻译进度:56%     翻译赏金:0 元 (?)    ¥ 我要打赏

你可以直接从最新的 Tephra 源码来构建:

  git clone https://git-wip-us.apache.org/repos/asf/incubator-tephra.git
  cd incubator-tephra
  mvn clean package

构建完成后,我们就有了一份完整的 Tephra 二进制分发文件,文件位于 tephra-distribution/target/ 目录。可以拷贝 tephra-<version>.tar.gz 文件到目标服务器上进行安装。

如果你是编写客户端应用,,需要添加如下的依赖到 Apache Maven POM 文件 (或者是构建系统相关的配置) 以便于使用 Tephra 类:

第 1 段(可获 0.93 积分)
  <dependency>
    <groupId>org.apache.tephra</groupId>
    <artifactId>tephra-api</artifactId>
    <version>${tephra.version}</version>
  </dependency>
  <dependency>
    <groupId>org.apache.tephra</groupId>
    <artifactId>tephra-core</artifactId>
    <version>${tephra.version}</version>
  </dependency>

因为不同版本 HBase 的 API 是不同的,因此你需要选择兼容的 HBase 库版本:

下面是针对 HBase 0.96.x 的配置:

  <dependency>
    <groupId>org.apache.tephra</groupId>
    <artifactId>tephra-hbase-compat-0.96</artifactId>
    <version>${tephra.version}</version>
  </dependency>
第 2 段(可获 0.29 积分)

针对于 HBase 0.98.x 的配置:

  <dependency>
    <groupId>org.apache.tephra</groupId>
    <artifactId>tephra-hbase-compat-0.98</artifactId>
    <version>${tephra.version}</version>
  </dependency>

针对于 HBase 1.0.x 的配置:

  <dependency>
    <groupId>org.apache.tephra</groupId>
    <artifactId>tephra-hbase-compat-1.0</artifactId>
    <version>${tephra.version}</version>
  </dependency>

如果您运行的是CDH 5.4,5.5,或5.6版本的HBase 1.0.x(此版本包含API不兼容Apache HBase 1.0.x),配置如下:

  <dependency>
    <groupId>org.apache.tephra</groupId>
    <artifactId>tephra-hbase-compat-1.0-cdh</artifactId>
    <version>${tephra.version}</version>
  </dependency>
第 3 段(可获 0.5 积分)

针对于 HBase 1.1.x 或 HBase 1.2.x 的配置:

  <dependency>
    <groupId>org.apache.tephra</groupId>
    <artifactId>tephra-hbase-compat-1.1</artifactId>
    <version>${tephra.version}</version>
  </dependency>

部署和配置

Tephra 使用了一个中央事务服务器来给修改数据和检测冲突指定唯一的事务标识。在同一时间内,仅会有一个事务服务器来积极地响应客户端的请求,然而,其他的事务服务器实例也可以同时运行,当主服务器无法响应客户端的请求时,他们可以提供自动的故障转移功能。

第 4 段(可获 0.81 积分)

事务服务器的配置

Tephra 事务服务器可以部署在同一个集群节点上来运行 HBase HMaster 进程。该事务服务器需要在服务器的 Java CLASSPATH 下有可用的 HBase 类库。

该事务服务器支持以下配置属性。所有配置属性都可以添加到服务器的 CLASSPATH 下的 hbase-site.xml 文件中:

名称缺省值描述
data.tx.bind.port15165绑定端口
data.tx.bind.address0.0.0.0监听服务器地址
data.tx.server.io.threads2Socket IO的线程数量
data.tx.server.threads20处理线程的数量
data.tx.timeout30一个事务的超时时间 (秒)
data.tx.long.timeout86400一个长时间运行的事务的超时时间 (秒)
data.tx.cleanup.interval10检查超时事务的频率 (秒)
data.tx.snapshot.dir HDFS目录,用于存储tx状态的快照
data.tx.snapshot.interval300创建新快照的频率
data.tx.snapshot.retain10保留旧事务快照的数量
data.tx.metrics.period60用于度量报告的频率 (秒)
第 5 段(可获 2.36 积分)

在你的 Tephra 安装目录下执行以下命令来运行事务服务器:

  ./bin/tephra start

任何特定环境下的定制都可以通过编辑 bin/tephra-env.sh 脚本来实现。

客户端的配置

由于 Tephra 的客户端程序会与HBase通信,所以在客户端的 Java CLASSPATH 下必须有可用的HBase客户端类库和HBase集群的配置 。

客户端API用法的描述请看Client APIs部分。

事务服务客户端支持以下配置属性。所有配置属性都可以添加到客户端的 CLASSPATH 下的 hbase-site.xml 文件中:

第 6 段(可获 1.11 积分)
名称缺省值描述
data.tx.client.timeout30000客户端 Socket 超时时间 (毫秒)
data.tx.client.providerpool客户端提供者策略:
  • “pool” 使用一个客户端池
  • “thread-local” 每个线程一个客户端
备注:如果线程被回收再利用,“thread-local”策略可能会出现资源泄漏的情况
data.tx.client.count50客户端中 “pool” 策略提供者的最大数量
data.tx.client.obtain.timeout3000从 “pool” 策略提供者获取客户端时的超时等待时间(毫秒)
data.tx.client.retry.strategybackoff客户端重试策略:
  • “backoff” 以退避算法尝试(译者注:退避算法
  • “n-times” 固定次数的尝试
data.tx.client.retry.attempts2重试次数 (“n-times” 策略)
data.tx.client.retry.backoff.initial100初始睡眠时间 (“backoff” 策略)
data.tx.client.retry.backoff.factor4睡眠时间的倍增因子
data.tx.client.retry.backoff.limit30000当睡眠时间达到该值时推退出
第 7 段(可获 1.81 积分)

HBase Coprocessor Configuration

In addition to the transaction server, Tephra requires an HBase coprocessor to be installed on all tables where transactional reads and writes will be performed.

To configure the coprocessor on all HBase tables, add the following to hbase-site.xml:

  <property>
    <name>hbase.coprocessor.region.classes</name>
    <value>org.apache.tephra.hbase.coprocessor.TransactionProcessor</value>
  </property>

You may configure the TransactionProcessor to be loaded only on HBase tables that you will be using for transaction reads and writes. However, you must ensure that the coprocessor is available on all impacted tables in order for Tephra to function correctly.

第 8 段(可获 1.05 积分)

Using Existing HBase Tables Transactionally

Tephra overrides HBase cell timestamps with transaction IDs, and uses these transaction IDs to filter out cells older than the TTL (Time-To-Live). Transaction IDs are at a higher scale than cell timestamps. When a regular HBase table that has existing data is converted to a transactional table, existing data may be filtered out during reads. To allow reading of existing data from a transactional table, you will need to set the property data.tx.read.pre.existing as true on the table’s table descriptor.

Note that even without the property data.tx.read.pre.existing being set to true, any existing data will not be removed during compactions. Existing data simply won’t be visible during reads.

第 9 段(可获 1.54 积分)

Metrics Reporting

Tephra ships with built-in support for reporting metrics via JMX and a log file, using the Dropwizard Metrics library.

To enable JMX reporting for metrics, you will need to enable JMX in the Java runtime arguments. Edit the bin/tephra-env.sh script and uncomment the following lines, making any desired changes to configuration for port used, SSL, and JMX authentication:

  export JMX_OPTS="-Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.port=13001"
  export OPTS="$OPTS $JMX_OPTS"
第 10 段(可获 0.78 积分)

To enable file-based reporting for metrics, edit the conf/logback.xml file and uncomment the following section, replacing the FILE-PATH placeholder with a valid directory on the local filesystem:

  <appender name="METRICS" class="ch.qos.logback.core.rolling.RollingFileAppender">
    <file>/FILE-PATH/metrics.log</file>
    <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
      <fileNamePattern>metrics.log.%d{yyyy-MM-dd}</fileNamePattern>
      <maxHistory>30</maxHistory>
    </rollingPolicy>
    <encoder>
      <pattern>%d{ISO8601} %msg%n</pattern>
    </encoder>
  </appender>
  <logger name="tephra-metrics" level="TRACE" additivity="false">
    <appender-ref ref="METRICS" />
  </logger>
第 11 段(可获 0.36 积分)

度量报告的频率可以通过设置 data.tx.metrics.period 这个配置项来调整,其报告周期频率以秒为单位。

Client APIs

TransactionAwareHTable 类实现了HBase 的 HTableInterface 接口,以标准的HBase HTable实例来提供相同的API。仅有某些操作支持事务,如下:

事务性方法
exists(Get get)
exists(List<Get> gets)
get(Get get)
get(List<Get> gets)
batch(List<? extends Row> actions, Object[] results)
batch(List<? extends Row> actions)
batchCallback(List<? extends Row> actions, Object[] results, Batch.Callback<R> callback) [0.96]
batchCallback(List<? extends Row> actions, Batch.Callback<R> callback) [0.96]
getScanner(byte[] family)
getScanner(byte[] family, byte[] qualifier)
put(Put put)
put(List<Put> puts)
delete(Delete delete)
delete(List<Delete> deletes)
第 12 段(可获 1.6 积分)

其他操作是非事务性的,如果调用的话,会抛出UnsupportedOperationException异常。允许使用类似于callsetallownontransactional(true)的非事务性操作。您可以调用下面的非事务性方法:

非事务性方法
getRowOrBefore(byte[] row, byte[], family)
checkAndPut(byte[] row, byte[] family, byte[] qualifier, byte[] value, Put put)
checkAndDelete(byte[] row, byte[] family, byte[] qualifier, byte[] value, Delete delete)
mutateRow(RowMutations rm)
append(Append append)
increment(Increment increment)
incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount)
incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount, Durability durability)
incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount, boolean writeToWAL)
第 13 段(可获 1.29 积分)

Note that for batch operations, only certain supported operations (get, put, and delete) are applied transactionally.

Usage

To use a TransactionalAwareHTable, you need an instance of TransactionContext. TransactionContext provides the basic contract for client use of transactions. At each point in the transaction lifecycle, it provides the necessary interactions with the Tephra Transaction Server in order to start, commit, and rollback transactions. Basic usage ofTransactionContext is handled using the following pattern:

  TransactionContext context = new TransactionContext(client, transactionAwareHTable);
  try {
    context.start();
    transactionAwareHTable.put(new Put(Bytes.toBytes("row"));
    // ...
    context.finish();
  } catch (TransactionFailureException e) {
    context.abort();
  }
第 14 段(可获 0.89 积分)
  1. First, a new transaction is started using TransactionContext.start().
  2. Next, any data operations are performed within the context of the transaction.
  3. After data operations are complete, TransactionContext.finish() is called to commit the transaction.
  4. If an exception occurs, TransactionContext.abort() can be called to rollback the transaction.

TransactionAwareHTable handles the details of performing data operations transactionally, and implements the necessary hooks in order to commit and rollback the data changes (see TransactionAware).

Example

To demonstrate how you might use TransactionAwareHTable\s, below is a basic implementation of a SecondaryIndexTable. This class encapsulates the usage of aTransactionContext and provides a simple interface to a user:

  /**
   * A Transactional SecondaryIndexTable.
   */
  public class SecondaryIndexTable {
    private byte[] secondaryIndex;
    private TransactionAwareHTable transactionAwareHTable;
    private TransactionAwareHTable secondaryIndexTable;
    private TransactionContext transactionContext;
    private final TableName secondaryIndexTableName;
    private static final byte[] secondaryIndexFamily =
      Bytes.toBytes("secondaryIndexFamily");
    private static final byte[] secondaryIndexQualifier = Bytes.toBytes('r');
    private static final byte[] DELIMITER  = new byte[] {0};

    public SecondaryIndexTable(TransactionServiceClient transactionServiceClient,
                               HTable hTable, byte[] secondaryIndex) {
      secondaryIndexTableName =
            TableName.valueOf(hTable.getName().getNameAsString() + ".idx");
      HTable secondaryIndexHTable = null;
      HBaseAdmin hBaseAdmin = null;
      try {
        hBaseAdmin = new HBaseAdmin(hTable.getConfiguration());
        if (!hBaseAdmin.tableExists(secondaryIndexTableName)) {
          hBaseAdmin.createTable(new HTableDescriptor(secondaryIndexTableName));
        }
        secondaryIndexHTable = new HTable(hTable.getConfiguration(),
                                          secondaryIndexTableName);
      } catch (Exception e) {
        Throwables.propagate(e);
      } finally {
        try {
          hBaseAdmin.close();
        } catch (Exception e) {
          Throwables.propagate(e);
        }
      }

      this.secondaryIndex = secondaryIndex;
      this.transactionAwareHTable = new TransactionAwareHTable(hTable);
      this.secondaryIndexTable = new TransactionAwareHTable(secondaryIndexHTable);
      this.transactionContext = new TransactionContext(transactionServiceClient,
                                                       transactionAwareHTable,
                                                       secondaryIndexTable);
    }

    public Result get(Get get) throws IOException {
      return get(Collections.singletonList(get))[0];
    }

    public Result[] get(List<Get> gets) throws IOException {
      try {
        transactionContext.start();
        Result[] result = transactionAwareHTable.get(gets);
        transactionContext.finish();
        return result;
      } catch (Exception e) {
        try {
          transactionContext.abort();
        } catch (TransactionFailureException e1) {
          throw new IOException("Could not rollback transaction", e1);
        }
      }
      return null;
    }

    public Result[] getByIndex(byte[] value) throws IOException {
      try {
        transactionContext.start();
        Scan scan = new Scan(value, Bytes.add(value, new byte[0]));
        scan.addColumn(secondaryIndexFamily, secondaryIndexQualifier);
        ResultScanner indexScanner = secondaryIndexTable.getScanner(scan);

        ArrayList<Get> gets = new ArrayList<Get>();
        for (Result result : indexScanner) {
          for (Cell cell : result.listCells()) {
            gets.add(new Get(cell.getValue()));
          }
        }
        Result[] results = transactionAwareHTable.get(gets);
        transactionContext.finish();
        return results;
      } catch (Exception e) {
        try {
          transactionContext.abort();
        } catch (TransactionFailureException e1) {
          throw new IOException("Could not rollback transaction", e1);
        }
      }
      return null;
    }

    public void put(Put put) throws IOException {
      put(Collections.singletonList(put));
    }


    public void put(List<Put> puts) throws IOException {
      try {
        transactionContext.start();
        ArrayList<Put> secondaryIndexPuts = new ArrayList<Put>();
        for (Put put : puts) {
          List<Put> indexPuts = new ArrayList<Put>();
          Set<Map.Entry<byte[], List<KeyValue>>> familyMap = put.getFamilyMap().entrySet();
          for (Map.Entry<byte [], List<KeyValue>> family : familyMap) {
            for (KeyValue value : family.getValue()) {
              if (value.getQualifier().equals(secondaryIndex)) {
                byte[] secondaryRow = Bytes.add(value.getQualifier(),
                                                DELIMITER,
                                                Bytes.add(value.getValue(),
                                                DELIMITER,
                                                value.getRow()));
                Put indexPut = new Put(secondaryRow);
                indexPut.add(secondaryIndexFamily, secondaryIndexQualifier, put.getRow());
                indexPuts.add(indexPut);
              }
            }
          }
          secondaryIndexPuts.addAll(indexPuts);
        }
        transactionAwareHTable.put(puts);
        secondaryIndexTable.put(secondaryIndexPuts);
        transactionContext.finish();
      } catch (Exception e) {
        try {
          transactionContext.abort();
        } catch (TransactionFailureException e1) {
          throw new IOException("Could not rollback transaction", e1);
        }
      }
    }
  }


 
第 15 段(可获 1.3 积分)

Known Issues and Limitations

  • Currently, column family Delete operations are implemented by writing a cell with an empty qualifier (empty byte[]) and empty value (empty byte[]). This is done in place of native HBase Delete operations so the delete marker can be rolled back in the event of a transaction failure – normal HBase Delete operations cannot be undone. However, this means that applications that store data in a column with an empty qualifier will not be able to store empty values, and will not be able to transactionally delete that column.
  • Column Delete operations are implemented by writing a empty value (empty byte[]) to the column. This means that applications will not be able to store empty values to columns.
  • Invalid transactions are not automatically cleared from the exclusion list. When a transaction is invalidated, either from timing out or being invalidated by the client due to a failure to rollback changes, its transaction ID is added to a list of excluded transactions. Data from invalidated transactions will be dropped by the TransactionProcessor coprocessor on HBase region flush and compaction operations. Currently, however, transaction IDs can only be manually removed from the list of excluded transaction IDs, using theorg.apache.tephra.TransactionAdmin tool.
第 16 段(可获 2.54 积分)

文章评论

ypddw
不忍直视