feat(tvix/castore): extend blobstore protos for verified streaming

This pdates the proto docstrings a bit, especially w.r.t. verified
streaming.
It also adds send_chunks, send_bao fields to StatBlobRequest (renamed
from BlobMeta)

Change-Id: I590cc8646d86b73bca9f38a9b6d9ea15e4df5cb6
Reviewed-on: https://cl.tvl.fyi/c/depot/+/9951
Tested-by: BuildkiteCI
Reviewed-by: raitobezarius <tvl@lahfa.xyz>
This commit is contained in:
Florian Klink 2023-11-05 10:45:08 +02:00 committed by flokli
parent b921e3a7e3
commit 67999f0dcf
5 changed files with 263 additions and 90 deletions

View file

@ -30,6 +30,10 @@ type StatBlobRequest struct {
// The blake3 digest of the blob requested
Digest []byte `protobuf:"bytes,1,opt,name=digest,proto3" json:"digest,omitempty"`
// Whether the server should reply with a list of more granular chunks.
SendChunks bool `protobuf:"varint,2,opt,name=send_chunks,json=sendChunks,proto3" json:"send_chunks,omitempty"`
// Whether the server should reply with a bao.
SendBao bool `protobuf:"varint,3,opt,name=send_bao,json=sendBao,proto3" json:"send_bao,omitempty"`
}
func (x *StatBlobRequest) Reset() {
@ -71,14 +75,36 @@ func (x *StatBlobRequest) GetDigest() []byte {
return nil
}
type BlobMeta struct {
func (x *StatBlobRequest) GetSendChunks() bool {
if x != nil {
return x.SendChunks
}
return false
}
func (x *StatBlobRequest) GetSendBao() bool {
if x != nil {
return x.SendBao
}
return false
}
type StatBlobResponse struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
// If `send_chunks` was set to true, this MAY contain a list of more
// granular chunks, which then may be read individually via the `Read`
// method.
Chunks []*StatBlobResponse_ChunkMeta `protobuf:"bytes,2,rep,name=chunks,proto3" json:"chunks,omitempty"`
// If `send_bao` was set to true, this MAY contain a outboard bao.
// The exact format and message types here will still be fleshed out.
Bao []byte `protobuf:"bytes,3,opt,name=bao,proto3" json:"bao,omitempty"`
}
func (x *BlobMeta) Reset() {
*x = BlobMeta{}
func (x *StatBlobResponse) Reset() {
*x = StatBlobResponse{}
if protoimpl.UnsafeEnabled {
mi := &file_tvix_castore_protos_rpc_blobstore_proto_msgTypes[1]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
@ -86,13 +112,13 @@ func (x *BlobMeta) Reset() {
}
}
func (x *BlobMeta) String() string {
func (x *StatBlobResponse) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*BlobMeta) ProtoMessage() {}
func (*StatBlobResponse) ProtoMessage() {}
func (x *BlobMeta) ProtoReflect() protoreflect.Message {
func (x *StatBlobResponse) ProtoReflect() protoreflect.Message {
mi := &file_tvix_castore_protos_rpc_blobstore_proto_msgTypes[1]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
@ -104,11 +130,25 @@ func (x *BlobMeta) ProtoReflect() protoreflect.Message {
return mi.MessageOf(x)
}
// Deprecated: Use BlobMeta.ProtoReflect.Descriptor instead.
func (*BlobMeta) Descriptor() ([]byte, []int) {
// Deprecated: Use StatBlobResponse.ProtoReflect.Descriptor instead.
func (*StatBlobResponse) Descriptor() ([]byte, []int) {
return file_tvix_castore_protos_rpc_blobstore_proto_rawDescGZIP(), []int{1}
}
func (x *StatBlobResponse) GetChunks() []*StatBlobResponse_ChunkMeta {
if x != nil {
return x.Chunks
}
return nil
}
func (x *StatBlobResponse) GetBao() []byte {
if x != nil {
return x.Bao
}
return nil
}
type ReadBlobRequest struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
@ -254,41 +294,112 @@ func (x *PutBlobResponse) GetDigest() []byte {
return nil
}
type StatBlobResponse_ChunkMeta struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
// Digest of that specific chunk
Digest []byte `protobuf:"bytes,1,opt,name=digest,proto3" json:"digest,omitempty"`
// Length of that chunk, in bytes.
Size uint64 `protobuf:"varint,2,opt,name=size,proto3" json:"size,omitempty"`
}
func (x *StatBlobResponse_ChunkMeta) Reset() {
*x = StatBlobResponse_ChunkMeta{}
if protoimpl.UnsafeEnabled {
mi := &file_tvix_castore_protos_rpc_blobstore_proto_msgTypes[5]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *StatBlobResponse_ChunkMeta) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*StatBlobResponse_ChunkMeta) ProtoMessage() {}
func (x *StatBlobResponse_ChunkMeta) ProtoReflect() protoreflect.Message {
mi := &file_tvix_castore_protos_rpc_blobstore_proto_msgTypes[5]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use StatBlobResponse_ChunkMeta.ProtoReflect.Descriptor instead.
func (*StatBlobResponse_ChunkMeta) Descriptor() ([]byte, []int) {
return file_tvix_castore_protos_rpc_blobstore_proto_rawDescGZIP(), []int{1, 0}
}
func (x *StatBlobResponse_ChunkMeta) GetDigest() []byte {
if x != nil {
return x.Digest
}
return nil
}
func (x *StatBlobResponse_ChunkMeta) GetSize() uint64 {
if x != nil {
return x.Size
}
return 0
}
var File_tvix_castore_protos_rpc_blobstore_proto protoreflect.FileDescriptor
var file_tvix_castore_protos_rpc_blobstore_proto_rawDesc = []byte{
0x0a, 0x27, 0x74, 0x76, 0x69, 0x78, 0x2f, 0x63, 0x61, 0x73, 0x74, 0x6f, 0x72, 0x65, 0x2f, 0x70,
0x72, 0x6f, 0x74, 0x6f, 0x73, 0x2f, 0x72, 0x70, 0x63, 0x5f, 0x62, 0x6c, 0x6f, 0x62, 0x73, 0x74,
0x6f, 0x72, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x0f, 0x74, 0x76, 0x69, 0x78, 0x2e,
0x63, 0x61, 0x73, 0x74, 0x6f, 0x72, 0x65, 0x2e, 0x76, 0x31, 0x22, 0x29, 0x0a, 0x0f, 0x53, 0x74,
0x63, 0x61, 0x73, 0x74, 0x6f, 0x72, 0x65, 0x2e, 0x76, 0x31, 0x22, 0x65, 0x0a, 0x0f, 0x53, 0x74,
0x61, 0x74, 0x42, 0x6c, 0x6f, 0x62, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x16, 0x0a,
0x06, 0x64, 0x69, 0x67, 0x65, 0x73, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x06, 0x64,
0x69, 0x67, 0x65, 0x73, 0x74, 0x22, 0x0a, 0x0a, 0x08, 0x42, 0x6c, 0x6f, 0x62, 0x4d, 0x65, 0x74,
0x61, 0x22, 0x29, 0x0a, 0x0f, 0x52, 0x65, 0x61, 0x64, 0x42, 0x6c, 0x6f, 0x62, 0x52, 0x65, 0x71,
0x75, 0x65, 0x73, 0x74, 0x12, 0x16, 0x0a, 0x06, 0x64, 0x69, 0x67, 0x65, 0x73, 0x74, 0x18, 0x01,
0x20, 0x01, 0x28, 0x0c, 0x52, 0x06, 0x64, 0x69, 0x67, 0x65, 0x73, 0x74, 0x22, 0x1f, 0x0a, 0x09,
0x42, 0x6c, 0x6f, 0x62, 0x43, 0x68, 0x75, 0x6e, 0x6b, 0x12, 0x12, 0x0a, 0x04, 0x64, 0x61, 0x74,
0x61, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x22, 0x29, 0x0a,
0x0f, 0x50, 0x75, 0x74, 0x42, 0x6c, 0x6f, 0x62, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65,
0x12, 0x16, 0x0a, 0x06, 0x64, 0x69, 0x67, 0x65, 0x73, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c,
0x52, 0x06, 0x64, 0x69, 0x67, 0x65, 0x73, 0x74, 0x32, 0xe1, 0x01, 0x0a, 0x0b, 0x42, 0x6c, 0x6f,
0x62, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x43, 0x0a, 0x04, 0x53, 0x74, 0x61, 0x74,
0x12, 0x20, 0x2e, 0x74, 0x76, 0x69, 0x78, 0x2e, 0x63, 0x61, 0x73, 0x74, 0x6f, 0x72, 0x65, 0x2e,
0x76, 0x31, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x42, 0x6c, 0x6f, 0x62, 0x52, 0x65, 0x71, 0x75, 0x65,
0x73, 0x74, 0x1a, 0x19, 0x2e, 0x74, 0x76, 0x69, 0x78, 0x2e, 0x63, 0x61, 0x73, 0x74, 0x6f, 0x72,
0x65, 0x2e, 0x76, 0x31, 0x2e, 0x42, 0x6c, 0x6f, 0x62, 0x4d, 0x65, 0x74, 0x61, 0x12, 0x46, 0x0a,
0x04, 0x52, 0x65, 0x61, 0x64, 0x12, 0x20, 0x2e, 0x74, 0x76, 0x69, 0x78, 0x2e, 0x63, 0x61, 0x73,
0x74, 0x6f, 0x72, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x52, 0x65, 0x61, 0x64, 0x42, 0x6c, 0x6f, 0x62,
0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1a, 0x2e, 0x74, 0x76, 0x69, 0x78, 0x2e, 0x63,
0x61, 0x73, 0x74, 0x6f, 0x72, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x42, 0x6c, 0x6f, 0x62, 0x43, 0x68,
0x75, 0x6e, 0x6b, 0x30, 0x01, 0x12, 0x45, 0x0a, 0x03, 0x50, 0x75, 0x74, 0x12, 0x1a, 0x2e, 0x74,
0x76, 0x69, 0x78, 0x2e, 0x63, 0x61, 0x73, 0x74, 0x6f, 0x72, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x42,
0x6c, 0x6f, 0x62, 0x43, 0x68, 0x75, 0x6e, 0x6b, 0x1a, 0x20, 0x2e, 0x74, 0x76, 0x69, 0x78, 0x2e,
0x63, 0x61, 0x73, 0x74, 0x6f, 0x72, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x50, 0x75, 0x74, 0x42, 0x6c,
0x6f, 0x62, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x28, 0x01, 0x42, 0x28, 0x5a, 0x26,
0x63, 0x6f, 0x64, 0x65, 0x2e, 0x74, 0x76, 0x6c, 0x2e, 0x66, 0x79, 0x69, 0x2f, 0x74, 0x76, 0x69,
0x78, 0x2f, 0x63, 0x61, 0x73, 0x74, 0x6f, 0x72, 0x65, 0x2d, 0x67, 0x6f, 0x3b, 0x63, 0x61, 0x73,
0x74, 0x6f, 0x72, 0x65, 0x76, 0x31, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
0x69, 0x67, 0x65, 0x73, 0x74, 0x12, 0x1f, 0x0a, 0x0b, 0x73, 0x65, 0x6e, 0x64, 0x5f, 0x63, 0x68,
0x75, 0x6e, 0x6b, 0x73, 0x18, 0x02, 0x20, 0x01, 0x28, 0x08, 0x52, 0x0a, 0x73, 0x65, 0x6e, 0x64,
0x43, 0x68, 0x75, 0x6e, 0x6b, 0x73, 0x12, 0x19, 0x0a, 0x08, 0x73, 0x65, 0x6e, 0x64, 0x5f, 0x62,
0x61, 0x6f, 0x18, 0x03, 0x20, 0x01, 0x28, 0x08, 0x52, 0x07, 0x73, 0x65, 0x6e, 0x64, 0x42, 0x61,
0x6f, 0x22, 0xa2, 0x01, 0x0a, 0x10, 0x53, 0x74, 0x61, 0x74, 0x42, 0x6c, 0x6f, 0x62, 0x52, 0x65,
0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x43, 0x0a, 0x06, 0x63, 0x68, 0x75, 0x6e, 0x6b, 0x73,
0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x2b, 0x2e, 0x74, 0x76, 0x69, 0x78, 0x2e, 0x63, 0x61,
0x73, 0x74, 0x6f, 0x72, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x42, 0x6c, 0x6f,
0x62, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x2e, 0x43, 0x68, 0x75, 0x6e, 0x6b, 0x4d,
0x65, 0x74, 0x61, 0x52, 0x06, 0x63, 0x68, 0x75, 0x6e, 0x6b, 0x73, 0x12, 0x10, 0x0a, 0x03, 0x62,
0x61, 0x6f, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x03, 0x62, 0x61, 0x6f, 0x1a, 0x37, 0x0a,
0x09, 0x43, 0x68, 0x75, 0x6e, 0x6b, 0x4d, 0x65, 0x74, 0x61, 0x12, 0x16, 0x0a, 0x06, 0x64, 0x69,
0x67, 0x65, 0x73, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x06, 0x64, 0x69, 0x67, 0x65,
0x73, 0x74, 0x12, 0x12, 0x0a, 0x04, 0x73, 0x69, 0x7a, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x04,
0x52, 0x04, 0x73, 0x69, 0x7a, 0x65, 0x22, 0x29, 0x0a, 0x0f, 0x52, 0x65, 0x61, 0x64, 0x42, 0x6c,
0x6f, 0x62, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x16, 0x0a, 0x06, 0x64, 0x69, 0x67,
0x65, 0x73, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x06, 0x64, 0x69, 0x67, 0x65, 0x73,
0x74, 0x22, 0x1f, 0x0a, 0x09, 0x42, 0x6c, 0x6f, 0x62, 0x43, 0x68, 0x75, 0x6e, 0x6b, 0x12, 0x12,
0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x64, 0x61,
0x74, 0x61, 0x22, 0x29, 0x0a, 0x0f, 0x50, 0x75, 0x74, 0x42, 0x6c, 0x6f, 0x62, 0x52, 0x65, 0x73,
0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x16, 0x0a, 0x06, 0x64, 0x69, 0x67, 0x65, 0x73, 0x74, 0x18,
0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x06, 0x64, 0x69, 0x67, 0x65, 0x73, 0x74, 0x32, 0xe9, 0x01,
0x0a, 0x0b, 0x42, 0x6c, 0x6f, 0x62, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x4b, 0x0a,
0x04, 0x53, 0x74, 0x61, 0x74, 0x12, 0x20, 0x2e, 0x74, 0x76, 0x69, 0x78, 0x2e, 0x63, 0x61, 0x73,
0x74, 0x6f, 0x72, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x42, 0x6c, 0x6f, 0x62,
0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x21, 0x2e, 0x74, 0x76, 0x69, 0x78, 0x2e, 0x63,
0x61, 0x73, 0x74, 0x6f, 0x72, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x42, 0x6c,
0x6f, 0x62, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x46, 0x0a, 0x04, 0x52, 0x65,
0x61, 0x64, 0x12, 0x20, 0x2e, 0x74, 0x76, 0x69, 0x78, 0x2e, 0x63, 0x61, 0x73, 0x74, 0x6f, 0x72,
0x65, 0x2e, 0x76, 0x31, 0x2e, 0x52, 0x65, 0x61, 0x64, 0x42, 0x6c, 0x6f, 0x62, 0x52, 0x65, 0x71,
0x75, 0x65, 0x73, 0x74, 0x1a, 0x1a, 0x2e, 0x74, 0x76, 0x69, 0x78, 0x2e, 0x63, 0x61, 0x73, 0x74,
0x6f, 0x72, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x42, 0x6c, 0x6f, 0x62, 0x43, 0x68, 0x75, 0x6e, 0x6b,
0x30, 0x01, 0x12, 0x45, 0x0a, 0x03, 0x50, 0x75, 0x74, 0x12, 0x1a, 0x2e, 0x74, 0x76, 0x69, 0x78,
0x2e, 0x63, 0x61, 0x73, 0x74, 0x6f, 0x72, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x42, 0x6c, 0x6f, 0x62,
0x43, 0x68, 0x75, 0x6e, 0x6b, 0x1a, 0x20, 0x2e, 0x74, 0x76, 0x69, 0x78, 0x2e, 0x63, 0x61, 0x73,
0x74, 0x6f, 0x72, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x50, 0x75, 0x74, 0x42, 0x6c, 0x6f, 0x62, 0x52,
0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x28, 0x01, 0x42, 0x28, 0x5a, 0x26, 0x63, 0x6f, 0x64,
0x65, 0x2e, 0x74, 0x76, 0x6c, 0x2e, 0x66, 0x79, 0x69, 0x2f, 0x74, 0x76, 0x69, 0x78, 0x2f, 0x63,
0x61, 0x73, 0x74, 0x6f, 0x72, 0x65, 0x2d, 0x67, 0x6f, 0x3b, 0x63, 0x61, 0x73, 0x74, 0x6f, 0x72,
0x65, 0x76, 0x31, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
}
var (
@ -303,26 +414,28 @@ func file_tvix_castore_protos_rpc_blobstore_proto_rawDescGZIP() []byte {
return file_tvix_castore_protos_rpc_blobstore_proto_rawDescData
}
var file_tvix_castore_protos_rpc_blobstore_proto_msgTypes = make([]protoimpl.MessageInfo, 5)
var file_tvix_castore_protos_rpc_blobstore_proto_msgTypes = make([]protoimpl.MessageInfo, 6)
var file_tvix_castore_protos_rpc_blobstore_proto_goTypes = []interface{}{
(*StatBlobRequest)(nil), // 0: tvix.castore.v1.StatBlobRequest
(*BlobMeta)(nil), // 1: tvix.castore.v1.BlobMeta
(*StatBlobResponse)(nil), // 1: tvix.castore.v1.StatBlobResponse
(*ReadBlobRequest)(nil), // 2: tvix.castore.v1.ReadBlobRequest
(*BlobChunk)(nil), // 3: tvix.castore.v1.BlobChunk
(*PutBlobResponse)(nil), // 4: tvix.castore.v1.PutBlobResponse
(*StatBlobResponse_ChunkMeta)(nil), // 5: tvix.castore.v1.StatBlobResponse.ChunkMeta
}
var file_tvix_castore_protos_rpc_blobstore_proto_depIdxs = []int32{
0, // 0: tvix.castore.v1.BlobService.Stat:input_type -> tvix.castore.v1.StatBlobRequest
2, // 1: tvix.castore.v1.BlobService.Read:input_type -> tvix.castore.v1.ReadBlobRequest
3, // 2: tvix.castore.v1.BlobService.Put:input_type -> tvix.castore.v1.BlobChunk
1, // 3: tvix.castore.v1.BlobService.Stat:output_type -> tvix.castore.v1.BlobMeta
3, // 4: tvix.castore.v1.BlobService.Read:output_type -> tvix.castore.v1.BlobChunk
4, // 5: tvix.castore.v1.BlobService.Put:output_type -> tvix.castore.v1.PutBlobResponse
3, // [3:6] is the sub-list for method output_type
0, // [0:3] is the sub-list for method input_type
0, // [0:0] is the sub-list for extension type_name
0, // [0:0] is the sub-list for extension extendee
0, // [0:0] is the sub-list for field type_name
5, // 0: tvix.castore.v1.StatBlobResponse.chunks:type_name -> tvix.castore.v1.StatBlobResponse.ChunkMeta
0, // 1: tvix.castore.v1.BlobService.Stat:input_type -> tvix.castore.v1.StatBlobRequest
2, // 2: tvix.castore.v1.BlobService.Read:input_type -> tvix.castore.v1.ReadBlobRequest
3, // 3: tvix.castore.v1.BlobService.Put:input_type -> tvix.castore.v1.BlobChunk
1, // 4: tvix.castore.v1.BlobService.Stat:output_type -> tvix.castore.v1.StatBlobResponse
3, // 5: tvix.castore.v1.BlobService.Read:output_type -> tvix.castore.v1.BlobChunk
4, // 6: tvix.castore.v1.BlobService.Put:output_type -> tvix.castore.v1.PutBlobResponse
4, // [4:7] is the sub-list for method output_type
1, // [1:4] is the sub-list for method input_type
1, // [1:1] is the sub-list for extension type_name
1, // [1:1] is the sub-list for extension extendee
0, // [0:1] is the sub-list for field type_name
}
func init() { file_tvix_castore_protos_rpc_blobstore_proto_init() }
@ -344,7 +457,7 @@ func file_tvix_castore_protos_rpc_blobstore_proto_init() {
}
}
file_tvix_castore_protos_rpc_blobstore_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*BlobMeta); i {
switch v := v.(*StatBlobResponse); i {
case 0:
return &v.state
case 1:
@ -391,6 +504,18 @@ func file_tvix_castore_protos_rpc_blobstore_proto_init() {
return nil
}
}
file_tvix_castore_protos_rpc_blobstore_proto_msgTypes[5].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*StatBlobResponse_ChunkMeta); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
}
type x struct{}
out := protoimpl.TypeBuilder{
@ -398,7 +523,7 @@ func file_tvix_castore_protos_rpc_blobstore_proto_init() {
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_tvix_castore_protos_rpc_blobstore_proto_rawDesc,
NumEnums: 0,
NumMessages: 5,
NumMessages: 6,
NumExtensions: 0,
NumServices: 1,
},

View file

@ -31,21 +31,28 @@ const (
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
type BlobServiceClient interface {
// In the future, Stat will expose more metadata about a given blob,
// such as more granular chunking, baos.
// For now, it's only used to check for the existence of a blob, as asking
// this for a non-existing Blob will return a Status::not_found gRPC error.
Stat(ctx context.Context, in *StatBlobRequest, opts ...grpc.CallOption) (*BlobMeta, error)
// Read returns a stream of BlobChunk, which is just a stream of bytes with
// the digest specified in ReadBlobRequest.
//
// Stat can be used to check for the existence of a blob, as well as
// gathering more data about it, like more granular chunking information
// or baos.
// Server implementations are not required to provide more granular chunking
// information, especially if the digest specified in [StatBlobRequest] is
// already a chunk of a blob.
Stat(ctx context.Context, in *StatBlobRequest, opts ...grpc.CallOption) (*StatBlobResponse, error)
// Read allows reading (all) data of a blob/chunk by the BLAKE3 digest of
// its contents.
// If the backend communicated more granular chunks in the `Stat` request,
// this can also be used to read chunks.
// This request returns a stream of BlobChunk, which is just a container for
// a stream of bytes.
// The server may decide on whatever chunking it may seem fit as a size for
// the individual BlobChunk sent in the response stream.
// the individual BlobChunk sent in the response stream, this is mostly to
// keep individual messages at a manageable size.
Read(ctx context.Context, in *ReadBlobRequest, opts ...grpc.CallOption) (BlobService_ReadClient, error)
// Put uploads a Blob, by reading a stream of bytes.
//
// The way the data is chunked up in individual BlobChunk messages sent in
// the stream has no effect on how the server ends up chunking blobs up.
// the stream has no effect on how the server ends up chunking blobs up, if
// it does at all.
Put(ctx context.Context, opts ...grpc.CallOption) (BlobService_PutClient, error)
}
@ -57,8 +64,8 @@ func NewBlobServiceClient(cc grpc.ClientConnInterface) BlobServiceClient {
return &blobServiceClient{cc}
}
func (c *blobServiceClient) Stat(ctx context.Context, in *StatBlobRequest, opts ...grpc.CallOption) (*BlobMeta, error) {
out := new(BlobMeta)
func (c *blobServiceClient) Stat(ctx context.Context, in *StatBlobRequest, opts ...grpc.CallOption) (*StatBlobResponse, error) {
out := new(StatBlobResponse)
err := c.cc.Invoke(ctx, BlobService_Stat_FullMethodName, in, out, opts...)
if err != nil {
return nil, err
@ -136,21 +143,28 @@ func (x *blobServicePutClient) CloseAndRecv() (*PutBlobResponse, error) {
// All implementations must embed UnimplementedBlobServiceServer
// for forward compatibility
type BlobServiceServer interface {
// In the future, Stat will expose more metadata about a given blob,
// such as more granular chunking, baos.
// For now, it's only used to check for the existence of a blob, as asking
// this for a non-existing Blob will return a Status::not_found gRPC error.
Stat(context.Context, *StatBlobRequest) (*BlobMeta, error)
// Read returns a stream of BlobChunk, which is just a stream of bytes with
// the digest specified in ReadBlobRequest.
//
// Stat can be used to check for the existence of a blob, as well as
// gathering more data about it, like more granular chunking information
// or baos.
// Server implementations are not required to provide more granular chunking
// information, especially if the digest specified in [StatBlobRequest] is
// already a chunk of a blob.
Stat(context.Context, *StatBlobRequest) (*StatBlobResponse, error)
// Read allows reading (all) data of a blob/chunk by the BLAKE3 digest of
// its contents.
// If the backend communicated more granular chunks in the `Stat` request,
// this can also be used to read chunks.
// This request returns a stream of BlobChunk, which is just a container for
// a stream of bytes.
// The server may decide on whatever chunking it may seem fit as a size for
// the individual BlobChunk sent in the response stream.
// the individual BlobChunk sent in the response stream, this is mostly to
// keep individual messages at a manageable size.
Read(*ReadBlobRequest, BlobService_ReadServer) error
// Put uploads a Blob, by reading a stream of bytes.
//
// The way the data is chunked up in individual BlobChunk messages sent in
// the stream has no effect on how the server ends up chunking blobs up.
// the stream has no effect on how the server ends up chunking blobs up, if
// it does at all.
Put(BlobService_PutServer) error
mustEmbedUnimplementedBlobServiceServer()
}
@ -159,7 +173,7 @@ type BlobServiceServer interface {
type UnimplementedBlobServiceServer struct {
}
func (UnimplementedBlobServiceServer) Stat(context.Context, *StatBlobRequest) (*BlobMeta, error) {
func (UnimplementedBlobServiceServer) Stat(context.Context, *StatBlobRequest) (*StatBlobResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method Stat not implemented")
}
func (UnimplementedBlobServiceServer) Read(*ReadBlobRequest, BlobService_ReadServer) error {

View file

@ -6,33 +6,66 @@ package tvix.castore.v1;
option go_package = "code.tvl.fyi/tvix/castore-go;castorev1";
// BlobService allows reading (or uploading) content-addressed blobs of data.
// BLAKE3 is used as a hashing function for the data. Uploading a blob will
// return the BLAKE3 digest of it, and that's the identifier used to Read/Stat
// them too.
service BlobService {
// In the future, Stat will expose more metadata about a given blob,
// such as more granular chunking, baos.
// For now, it's only used to check for the existence of a blob, as asking
// this for a non-existing Blob will return a Status::not_found gRPC error.
rpc Stat(StatBlobRequest) returns (BlobMeta);
// Stat can be used to check for the existence of a blob, as well as
// gathering more data about it, like more granular chunking information
// or baos.
// Server implementations are not required to provide more granular chunking
// information, especially if the digest specified in [StatBlobRequest] is
// already a chunk of a blob.
rpc Stat(StatBlobRequest) returns (StatBlobResponse);
// Read returns a stream of BlobChunk, which is just a stream of bytes with
// the digest specified in ReadBlobRequest.
//
// Read allows reading (all) data of a blob/chunk by the BLAKE3 digest of
// its contents.
// If the backend communicated more granular chunks in the `Stat` request,
// this can also be used to read chunks.
// This request returns a stream of BlobChunk, which is just a container for
// a stream of bytes.
// The server may decide on whatever chunking it may seem fit as a size for
// the individual BlobChunk sent in the response stream.
// the individual BlobChunk sent in the response stream, this is mostly to
// keep individual messages at a manageable size.
rpc Read(ReadBlobRequest) returns (stream BlobChunk);
// Put uploads a Blob, by reading a stream of bytes.
//
// The way the data is chunked up in individual BlobChunk messages sent in
// the stream has no effect on how the server ends up chunking blobs up.
// the stream has no effect on how the server ends up chunking blobs up, if
// it does at all.
rpc Put(stream BlobChunk) returns (PutBlobResponse);
}
message StatBlobRequest {
// The blake3 digest of the blob requested
bytes digest = 1;
// Whether the server should reply with a list of more granular chunks.
bool send_chunks = 2;
// Whether the server should reply with a bao.
bool send_bao = 3;
}
message BlobMeta {
message StatBlobResponse {
// If `send_chunks` was set to true, this MAY contain a list of more
// granular chunks, which then may be read individually via the `Read`
// method.
repeated ChunkMeta chunks = 2;
message ChunkMeta {
// Digest of that specific chunk
bytes digest = 1;
// Length of that chunk, in bytes.
uint64 size = 2;
}
// If `send_bao` was set to true, this MAY contain a outboard bao.
// The exact format and message types here will still be fleshed out.
bytes bao = 3;
}
message ReadBlobRequest {

View file

@ -56,6 +56,7 @@ impl BlobService for GRPCBlobService {
let resp = grpc_client
.stat(proto::StatBlobRequest {
digest: digest.clone().into(),
..Default::default()
})
.await;

View file

@ -93,7 +93,7 @@ impl super::blob_service_server::BlobService for GRPCBlobServiceWrapper {
async fn stat(
&self,
request: Request<super::StatBlobRequest>,
) -> Result<Response<super::BlobMeta>, Status> {
) -> Result<Response<super::StatBlobResponse>, Status> {
let rq = request.into_inner();
let req_digest = rq
.digest
@ -101,7 +101,7 @@ impl super::blob_service_server::BlobService for GRPCBlobServiceWrapper {
.map_err(|_e| Status::invalid_argument("invalid digest length"))?;
match self.blob_service.has(&req_digest).await {
Ok(true) => Ok(Response::new(super::BlobMeta::default())),
Ok(true) => Ok(Response::new(super::StatBlobResponse::default())),
Ok(false) => Err(Status::not_found(format!("blob {} not found", &req_digest))),
Err(e) => Err(e.into()),
}