Thrift实现

rpc

The Apache Thrift software framework, for scalable cross-language services development, combines a software stack with a code generation engine to build services that work efficiently and seamlessly between C++, Java, Python, PHP, Ruby, Erlang, Perl, Haskell, C#, Cocoa, JavaScript, Node.js, Smalltalk, OCaml and Delphi and other languages.

Apache Thrift软件框架,用于可扩展的跨语言服务开发,将软件堆栈与代码生成引擎相结合,构建在C++、Java、Python、PHP、Ruby、Erlang、Perl、Haskell、C#、Cocoa、JavaScript、Node.js、Smalltalk、OCaml和Delphi以及其他语言之间高效无缝地工作的服务。

最初由FaceBook开发, 主要用于各个服务之间的RPC通信。典型的CS架构。顶层部分是由Thrift定义生成的代码。而服务则由这个文件客户端和处理器代码生成。在生成的代码里会创建不同于内建类型的数据结构,并将其作为结果发送。

下载

1
2
# mac os
$ brew install thrift

定义文档文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
// 定义需要转换的语言和命名空间路径
namespace java thrift.generated
namespace py thrift.generated

// 文件包含
include "global.thrift"

// 类型定义
typedef i16 short
typedef i32 int
typedef i64 long
typedef bool boolean
typedef string String

// 定义结构体-类
struct Person {
1: optional String username,
2: optional int age,
3: optional boolean married
}

// 定义枚举
enum Gender{
MALE,
FEMALE
}

// 定义常量
const i32 MAX_RETRIES_TIME = 10

// 定义异常
exception DataException {
1: optional String message,
2: optional String callStack,
3: optional String data
}

// 定义调用服务-调用接口
service PersonService {
Persion getPersonByUsername(1: required String username) throws (1: DataException dataException),

void savePerson(1: required Person person) throws (1: DataException dataException)
}

实现-java

导入jar包

1
2
3
4
5
6
dependencies {
testCompile group: 'junit', name: 'junit', version: '4.12'
compile(
"org.apache.thrift:libthrift:0.11.0"
)
}

代码生成

1
2
3
# 生成java代码
# thrift --gen <language> <Thrift filename>
$ thrift --gen java src/thrift/data.thrift

实现

服务实现

1
2
3
4
5
6
7
8
9
10
11
public class PersonServiceImpl implements PersonService.Iface {
@Override
public Person getPersonByUsername(String username) throws DataException, TException {
return new Person().setAge(32).setMarried(true).setUsername("张三");
}

@Override
public void savePerson(Person person) throws DataException, TException {
System.out.println(person);
}
}

服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package tt.learn.thrift;

import org.apache.thrift.TProcessorFactory;
import org.apache.thrift.protocol.TCompactProtocol;
import org.apache.thrift.server.THsHaServer;
import org.apache.thrift.server.TServer;
import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.TNonblockingServerSocket;
import org.apache.thrift.transport.TTransportException;
import tt.learn.thrift.generated.PersonService;

public class ThriftServer {

public static void main(String[] args) throws TTransportException {
TNonblockingServerSocket socket = new TNonblockingServerSocket(9999);
THsHaServer.Args arg = new THsHaServer.Args(socket).minWorkerThreads(2).maxWorkerThreads(6);

PersionService.Processor<PersonServiceImpl> processor = new PersonService.Processor<>(new PersonServiceImpl());
// 传输格式
arg.protocolFactory(new TCompactProtocol.Factory());
// 传输方式
arg.transportFactory(new TFramedTransport.Factory());
arg.processorFactory(new TProcessorFactory(processor));
// 服务模型
TServer server = new THsHaServer(arg);

server.serve();
}
}

客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package tt.learn.thrift;

import org.apache.thrift.protocol.TCompactProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.TSaslClientTransport;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import tt.learn.thrift.generated.PersonService;
import tt.learn.thrift.generated.Person;

public class ThriftClient {

public static void main(String[] args) {
// 传输方式
TTransport transport = new TFramedTransport(new TSocket("localhost", 9999), 600);
// 传输格式
TProtocol protocol = new TCompactProtocol(transport);

PersonService.Client client = new PersonService.Client(protocol);

try{
transport.open();

Person person = client.getPersonByUsername("张三");

System.out.println(person);

Person person1 = new Person().setAge(32).setMarried(true).setUsername("哈哈");
client.savePerson(person1);

} catch (Exception e){
throw new RuntimeException(e.getMessage(), e);
} finally {
transport.close();
}
}
}

概念

传输格式

TBinaryProtocol

二进制格式

TCompactProtocol

压缩格式

TJSONProtocol

JSON格式

TSimpleJSONProtocol

JSON只写协议, 没有包含类型相关的脚本源数据

TDebugProtocol

可读的文本格式便于debug调试

传输方式

TSocket

阻塞式的socket

TFramedTransport

以frame为单位进行传输, 非阻塞式服务使用

TFileTransport

文件形式传输

TMemoryTransport

内存IO操作, 实现内部简单的ByteArrayOutputStream

TZlibTransport

使用zlib进行压缩与其他传输联合使用–java无实现

服务模型

TSimpleServer

简单的单线程服务模型,用于测试

TThreadPoolServer

多线程服务模型, 使用标准的阻塞式IO

TNonblockingServer

多线程服务模型,使用非阻塞的IO

需要使用TFramedTransport数据传输方式

THsHaServer

使用线程池去处理,其模型把读写任务放在线程池去处理

是TNonblockingServer的扩展实现

Half-sync/Half-async的处理模式

  • Half-async 是在处理IO事件上(accept/read/write io)
  • Half-sync用于handler对RPC的同步处理
 上一篇

数据结构