wip: pooltable/framtable handling
This commit is contained in:
@@ -2,10 +2,73 @@ namespace Archive
|
||||
|
||||
open BackingStore
|
||||
|
||||
type IArchive<'Meta, 'Data> =
|
||||
type IArchive<'XAttrs, 'Meta, 'Data> =
|
||||
abstract readIndex: key: StoreKey -> 'Meta
|
||||
abstract writeIndex: key: StoreKey * 'Meta -> unit
|
||||
abstract readXAttrs: key: StoreKey -> 'XAttrs
|
||||
abstract writeXAttrs: key: StoreKey * 'XAttrs -> unit
|
||||
abstract addFrame: key: StoreKey * data: byte[] -> unit
|
||||
abstract delFrame: key: StoreKey -> unit
|
||||
abstract deleteKey: key: StoreKey -> unit
|
||||
abstract purgeArchive: key: StoreKey -> unit
|
||||
abstract purgeArchive: key: StoreKey -> unit
|
||||
|
||||
[<AutoOpen>]
|
||||
module Pool =
|
||||
type internal NineBall =
|
||||
| Head
|
||||
| Tail
|
||||
| Fail
|
||||
|
||||
let internal ckeckAddFrame (pt: FrameTable) (p: PoolTableEntry) =
|
||||
let t0 = fst pt.poolTable[0]
|
||||
let tn = fst pt.poolTable[^0]
|
||||
let dt = (tn - t0) / (pt.poolTable.Length - 1)
|
||||
let tix = fst p
|
||||
// check that the frame has the coorect dt
|
||||
if pt.interval <> 0 && dt = pt.interval then
|
||||
if tix - dt = tn then Tail
|
||||
elif tix + dt = t0 then Head
|
||||
else Fail
|
||||
else
|
||||
if tix > tn then Tail
|
||||
elif tix < t0 then Head
|
||||
else Fail
|
||||
|
||||
let internal ckeckDelFrame (pt: FrameTable) (p: PoolTableEntry) =
|
||||
let t0 = fst pt.poolTable[0]
|
||||
let tn = fst pt.poolTable[^0]
|
||||
let tix = fst p
|
||||
if tix = tn then Tail
|
||||
elif tix = t0 then Head
|
||||
else Fail
|
||||
|
||||
// let private saneInTheMembrane (p: PoolTable) =
|
||||
// p |> Array.fold (fun a (_, pool) -> a && int pool < p.Length) true
|
||||
|
||||
// let private ckeckAddPoolTable (p0: PoolTable) (p: PoolTable) =
|
||||
// let t0 = fst p0[0]
|
||||
// let tn = fst p0[^0]
|
||||
// let dt = (tn - t0) / (p0.Length - 1)
|
||||
// let tx = fst p[0]
|
||||
// let ty = fst p[^0]
|
||||
// let ok, _ = // check that all frames have the same dt
|
||||
// p |> Array.fold (fun (a, t1) (t2, _) -> a && (t2 - t1) = dt, t2) (true, tx - dt)
|
||||
// // printfn $"check add: {ok}, t0={t0}, tn={tn}, dt={dt}, tx={tx}, ty={ty}"
|
||||
// if ok && tx - dt = tn then Tail
|
||||
// elif ok && ty + dt = t0 then Head
|
||||
// else Fail
|
||||
//
|
||||
// let private ckeckDelPoolTable (p0: PoolTable) (p: PoolTable) =
|
||||
// let t0 = fst p0[0]
|
||||
// let tn = fst p0[^0]
|
||||
// let dt = (tn - t0) / (p0.Length - 1)
|
||||
// let tx = fst p[0]
|
||||
// let ty = fst p[^0]
|
||||
// let ok, _ = // check that all frames have the same dt
|
||||
// p |> Array.fold (fun (a, t1) (t2, _) -> a && (t2 - t1) = dt, t2) (true, tx - dt)
|
||||
// // printfn $"check del: t0={t0}, tn={tn}, dt={dt}, tx={tx}, ty={ty}"
|
||||
// if ok && ty = tn then Tail
|
||||
// elif ok && tx = t0 then Head
|
||||
// else Fail
|
||||
|
||||
|
||||
|
||||
@@ -3,43 +3,10 @@ namespace BackingStore
|
||||
open System
|
||||
open System.IO
|
||||
open Thoth.Json.Net
|
||||
open MessagePack
|
||||
open MessagePack.FSharp
|
||||
open FSharpPlus
|
||||
|
||||
type Format =
|
||||
| OZone of string
|
||||
| Zarr of string
|
||||
| Nc of string
|
||||
| Json of string
|
||||
member this.Value =
|
||||
match this with
|
||||
| OZone x -> x
|
||||
| Zarr x -> x
|
||||
| Nc x -> x
|
||||
| Json x -> x
|
||||
static member fromUri (uri: Uri) =
|
||||
if uri.LocalPath.EndsWith ".nc" then Nc uri.LocalPath
|
||||
elif uri.LocalPath.EndsWith ".zarr" then Zarr uri.LocalPath
|
||||
elif uri.LocalPath.EndsWith ".json" then Json uri.LocalPath
|
||||
else OZone uri.LocalPath
|
||||
|
||||
type Store =
|
||||
| O3 of Format
|
||||
| S3 of Format
|
||||
| Dir of Format
|
||||
| File of Format
|
||||
| Unknown
|
||||
static member fromUri (uri: Uri) =
|
||||
let archive = Format.fromUri uri
|
||||
match uri.Scheme with
|
||||
| "o3" -> O3 archive
|
||||
| "s3" -> S3 archive
|
||||
| "dir" -> Dir archive
|
||||
| "file" ->
|
||||
match archive with
|
||||
| OZone _ -> Unknown
|
||||
| _ -> File archive
|
||||
| _ -> Unknown
|
||||
|
||||
type StoreKey(key: string, ext: string option, frame: int option, layer: int option, chunk: int option) =
|
||||
member this.key = key
|
||||
member this.ext = ext
|
||||
@@ -70,7 +37,7 @@ type O2Key(key: string, ?ext: string, ?frame: int, ?layer, ?chunk) =
|
||||
let prop = this.ext |> Option.map (fun y -> $"{y}.") |> Option.defaultValue ""
|
||||
Path.Join [| this.Key; $"{this.Frame}"; $"{prop}{this.Layer}.{this.Chunk}" |]
|
||||
|
||||
type StringKey(key: string, ?ext: string) =
|
||||
type FileKey(key: string, ?ext: string) =
|
||||
inherit StoreKey(key, ext, None, None, None)
|
||||
override this.ToString () = $"{this.Key}{this.Ext}"
|
||||
|
||||
@@ -94,14 +61,49 @@ type XAttrs = {
|
||||
version = 0uy
|
||||
}
|
||||
|
||||
type Frame = int
|
||||
type PoolId = byte
|
||||
|
||||
type PoolName = string
|
||||
|
||||
[<MessagePackObject>]
|
||||
type PoolMap = Map<PoolId, PoolName>
|
||||
|
||||
[<MessagePackObject>]
|
||||
type PoolTableEntry = Frame * PoolId
|
||||
|
||||
[<MessagePackObject>]
|
||||
type PoolIndex =
|
||||
| Key of StoreKey
|
||||
| PoolMap of PoolMap
|
||||
|
||||
[<MessagePackObject>]
|
||||
type FrameTable = {
|
||||
[<Key(0)>]
|
||||
interval: int
|
||||
[<Key(1)>]
|
||||
poolIndex: PoolIndex
|
||||
[<Key(2)>]
|
||||
poolTable: PoolTableEntry[]
|
||||
} with
|
||||
static member empty = { interval = 0; poolIndex = Key (FileKey ("poolIndex", "map")); poolTable = [||] }
|
||||
|
||||
type IBackingStore =
|
||||
abstract readText: key: StoreKey -> string
|
||||
abstract writeText: key: StoreKey * data: string -> unit
|
||||
abstract readBinary: key: StoreKey -> byte[]
|
||||
abstract writeBinary: key: StoreKey * data: byte[] -> unit
|
||||
abstract readFrameTable: key: StoreKey -> FrameTable
|
||||
abstract writeFrameTable: key: StoreKey * data: FrameTable -> unit
|
||||
abstract readXAttrs: key: StoreKey -> XAttrs
|
||||
abstract writeXAttrs: key: StoreKey * XAttrs -> unit
|
||||
abstract keyExists: key: StoreKey -> bool
|
||||
abstract deleteKey: key: StoreKey -> unit
|
||||
|
||||
type StoreSet = {
|
||||
index: IBackingStore
|
||||
pools: IBackingStore[]
|
||||
}
|
||||
|
||||
[<RequireQualifiedAccess>]
|
||||
module Dir =
|
||||
@@ -111,6 +113,17 @@ module Dir =
|
||||
if dir.Length > 0 && not (Path.Exists dir) then
|
||||
Directory.CreateDirectory dir |> ignore
|
||||
{ new IBackingStore with
|
||||
member this.readFrameTable (key: StoreKey) =
|
||||
let map = this.readBinary key
|
||||
let ft = MessagePackSerializer.Deserialize<FrameTable> map
|
||||
match ft.poolIndex with
|
||||
| Key k ->
|
||||
let pm = this.readBinary k
|
||||
{ ft with poolIndex = PoolMap (MessagePackSerializer.Deserialize<PoolMap> pm) }
|
||||
| _ -> ft
|
||||
member this.writeFrameTable (key: StoreKey, table: FrameTable) =
|
||||
let bytes = MessagePackSerializer.Serialize { table with poolIndex = Key key }
|
||||
this.writeBinary (key, bytes)
|
||||
member this.readText key =
|
||||
let fname = Path.Join [| path; $"{key}" |]
|
||||
File.ReadAllText fname
|
||||
@@ -135,6 +148,7 @@ module Dir =
|
||||
let fname = Path.Join [| path; $"{key}.xattrs" |]
|
||||
let json = Encode.Auto.toString attrs
|
||||
File.WriteAllText (fname, json)
|
||||
member this.keyExists (key) = File.Exists (string key)
|
||||
member this.keyExists key = File.Exists (string key)
|
||||
member this.deleteKey key = File.Delete (string key)
|
||||
|
||||
}
|
||||
@@ -128,10 +128,14 @@ let o3Args (args: O3Args) =
|
||||
|
||||
let newO3Store (server: string) (pool: string) = {
|
||||
new IBackingStore with
|
||||
member this.readFrameTable key = failwith "not implemented"
|
||||
member this.writeFrameTable (key, data) = failwith "not implemented"
|
||||
member this.readText key = failwith "not implemented"
|
||||
member this.writeText (key, data) = failwith "not implemented"
|
||||
member this.readBinary key = failwith "not implemented"
|
||||
member this.writeBinary (key, data) = failwith "not implemented"
|
||||
member this.readXAttrs key = failwith "not implemented"
|
||||
member this.writeXAttrs (key, attrs) = failwith "not implemented"
|
||||
member this.keyExists key = failwith "not implemented"
|
||||
member this.deleteKey key = failwith "not implemented"
|
||||
}
|
||||
|
||||
138
src/OZone.fs
138
src/OZone.fs
@@ -30,26 +30,47 @@ type OzoneData = {
|
||||
w = Array.zeroCreate 0
|
||||
}
|
||||
|
||||
type PoolIdx = Frame * Pool
|
||||
type Format =
|
||||
| OZone of string
|
||||
| Zarr of string
|
||||
| Nc of string
|
||||
| Json of string
|
||||
member this.Value =
|
||||
match this with
|
||||
| OZone x -> x
|
||||
| Zarr x -> x
|
||||
| Nc x -> x
|
||||
| Json x -> x
|
||||
static member fromUri (uri: Uri) =
|
||||
if uri.LocalPath.EndsWith ".nc" then Nc uri.LocalPath
|
||||
elif uri.LocalPath.EndsWith ".zarr" then Zarr uri.LocalPath
|
||||
elif uri.LocalPath.EndsWith ".json" then Json uri.LocalPath
|
||||
else OZone uri.LocalPath
|
||||
|
||||
[<MessagePackObject>]
|
||||
type PoolTable = PoolIdx[]
|
||||
|
||||
[<MessagePackObject>]
|
||||
type FrameTable = {
|
||||
[<Key(0)>]
|
||||
pools: string[]
|
||||
[<Key(1)>]
|
||||
frames: PoolIdx[]
|
||||
} with
|
||||
static member empty = { pools = [||]; frames = [||] }
|
||||
type Store =
|
||||
| O3 of Format
|
||||
| S3 of Format
|
||||
| Dir of Format
|
||||
| File of Format
|
||||
| Unknown
|
||||
static member fromUri (uri: Uri) =
|
||||
let archive = Format.fromUri uri
|
||||
match uri.Scheme with
|
||||
| "o3" -> O3 archive
|
||||
| "s3" -> S3 archive
|
||||
| "dir" -> Dir archive
|
||||
| "file" ->
|
||||
match archive with
|
||||
| OZone _ -> Unknown
|
||||
| _ -> File archive
|
||||
| _ -> Unknown
|
||||
|
||||
let readPoolIndex (store: IBackingStore) =
|
||||
let key = StringKey("pool", "idx")
|
||||
let key = FileKey("pool", "idx")
|
||||
store.readText key |> fun s -> s.Split '\n'
|
||||
|
||||
let writePoolIndex (store: IBackingStore) (pools: string[]) =
|
||||
let key = StringKey("pool", "idx")
|
||||
let key = FileKey("pool", "idx")
|
||||
let ps = String.concat "\n" pools
|
||||
store.writeText (key, ps)
|
||||
|
||||
@@ -129,62 +150,31 @@ type O3Index = {
|
||||
properties = [| "s"; "t"; "u"; "v"; "w"; "z" |]
|
||||
}
|
||||
|
||||
type private NineBall =
|
||||
| Head
|
||||
| Tail
|
||||
| Fail
|
||||
type O3Archive(store: StoreSet) =
|
||||
let frameTable = store.index.readFrameTable (FileKey("pool", "idx"))
|
||||
|
||||
let private saneInTheMembrane (p: PoolTable) =
|
||||
p |> Array.fold (fun a (_, pool) -> a && int pool < p.Length) true
|
||||
|
||||
let private ckeckAddPoolTable (p0: PoolTable) (p: PoolTable) =
|
||||
let t0 = fst p0[0]
|
||||
let tn = fst p0[^0]
|
||||
let dt = (tn - t0) / (p0.Length - 1)
|
||||
let tx = fst p[0]
|
||||
let ty = fst p[^0]
|
||||
let ok, _ = // check that all frames have the same dt
|
||||
p |> Array.fold (fun (a, t1) (t2, _) -> a && (t2 - t1) = dt, t2) (true, tx - dt)
|
||||
// printfn $"check add: {ok}, t0={t0}, tn={tn}, dt={dt}, tx={tx}, ty={ty}"
|
||||
if ok && tx - dt = tn then Tail
|
||||
elif ok && ty + dt = t0 then Head
|
||||
else Fail
|
||||
|
||||
let private ckeckDelPoolTable (p0: PoolTable) (p: PoolTable) =
|
||||
let t0 = fst p0[0]
|
||||
let tn = fst p0[^0]
|
||||
let dt = (tn - t0) / (p0.Length - 1)
|
||||
let tx = fst p[0]
|
||||
let ty = fst p[^0]
|
||||
let ok, _ = // check that all frames have the same dt
|
||||
p |> Array.fold (fun (a, t1) (t2, _) -> a && (t2 - t1) = dt, t2) (true, tx - dt)
|
||||
// printfn $"check del: t0={t0}, tn={tn}, dt={dt}, tx={tx}, ty={ty}"
|
||||
if ok && ty = tn then Tail
|
||||
elif ok && tx = t0 then Head
|
||||
else Fail
|
||||
|
||||
type PoolSet = {
|
||||
index: IBackingStore
|
||||
pools: IBackingStore[]
|
||||
}
|
||||
|
||||
type O3Archive(store: IBackingStore) =
|
||||
interface IArchive<O3Index, OzoneData> with
|
||||
interface IArchive<XAttrs, O3Index, OzoneData> with
|
||||
member this.writeIndex (key: StoreKey, meta: O3Index) =
|
||||
let json = Encode.Auto.toString meta
|
||||
store.writeText (key, json)
|
||||
store.index.writeText (key, json)
|
||||
|
||||
member this.readIndex (key: StoreKey) =
|
||||
store.readText key
|
||||
store.index.readText key
|
||||
|> Decode.Auto.fromString<O3Index>
|
||||
|> Result.defaultWith (failwith "invalid index")
|
||||
|
||||
member this.writeXAttrs (key: StoreKey, attrs: XAttrs) =
|
||||
store.index.writeXAttrs (key, attrs)
|
||||
|
||||
member this.readXAttrs (key: StoreKey) =
|
||||
store.index.readXAttrs key
|
||||
|
||||
member this.addFrame (key: StoreKey, data: byte[]) = ()
|
||||
member this.delFrame (key: StoreKey) = ()
|
||||
member this.deleteKey (key: StoreKey) = ()
|
||||
member this.purgeArchive (key: StoreKey) = ()
|
||||
|
||||
member this.Archive = this :> IArchive<_, _>
|
||||
member this.Archive = this :> IArchive<_, _, _>
|
||||
|
||||
member this.writeIndex (aid: Guid) (meta: O3Index) =
|
||||
this.Archive.writeIndex (O3Key aid, meta)
|
||||
@@ -195,47 +185,37 @@ type O3Archive(store: IBackingStore) =
|
||||
member this.writeGrid (aid: Guid) (grid: BinGrid) =
|
||||
let key = O3Key(aid, "grid")
|
||||
let bytes = MessagePackSerializer.Serialize grid
|
||||
store.writeBinary (key, bytes)
|
||||
store.pools[0].writeBinary (key, bytes)
|
||||
|
||||
member this.readGrid (aid: Guid) =
|
||||
let key = O3Key(aid, "grid")
|
||||
let bytes = store.readBinary key
|
||||
let bytes = store.pools[0].readBinary key
|
||||
MessagePackSerializer.Deserialize<BinGrid> bytes
|
||||
|
||||
member this.writeXAttrs (aid: Guid) (attrs: XAttrs) =
|
||||
let key = O3Key(aid, "xattrs")
|
||||
store.writeXAttrs (key, attrs)
|
||||
this.Archive.writeXAttrs (key, attrs)
|
||||
|
||||
member this.readXAttrs (aid: Guid) =
|
||||
let key = O3Key(aid, "xattrs")
|
||||
store.readXAttrs key
|
||||
this.Archive.readXAttrs key
|
||||
|
||||
member this.writePoolTable (aid: Guid) (tab: PoolTable) =
|
||||
let key = O3Key(aid, "pool")
|
||||
let bytes = MessagePackSerializer.Serialize tab
|
||||
store.writeBinary (key, bytes)
|
||||
|
||||
member this.readPoolTable (aid: Guid) =
|
||||
let key = O3Key(aid, "pool")
|
||||
let bytes = store.readBinary key
|
||||
MessagePackSerializer.Deserialize<PoolTable> bytes
|
||||
|
||||
member this.addFrames (aid: Guid) (p: PoolTable) =
|
||||
member this.addFrame (aid: Guid) (p: PoolTableEntry) (frame: Async<OzoneData>)=
|
||||
let xattrs = this.readXAttrs aid
|
||||
let pt0 = this.readPoolTable aid
|
||||
let n = p.Length
|
||||
if saneInTheMembrane p then
|
||||
let pt = this.read aid
|
||||
let n = frameTable.Count
|
||||
if int pix > n then
|
||||
let pt =
|
||||
match ckeckAddPoolTable pt0 p with
|
||||
| Head -> Array.append p pt0
|
||||
| Tail -> Array.append pt0 p
|
||||
match ckeckAddFrame pt p with
|
||||
| Head -> Array.append p pt
|
||||
| Tail -> Array.append pt p
|
||||
| Fail -> failwith "frame mismatch"
|
||||
this.writeXAttrs aid { xattrs with frames = xattrs.frames + n; start = fst pt[0] }
|
||||
this.writePoolTable aid pt
|
||||
else
|
||||
failwith "pool index out of bounds"
|
||||
|
||||
member this.delFrames (aid: Guid) (p: PoolTable) =
|
||||
member this.delFrame (aid: Guid) (p: PoolTableEntry) =
|
||||
let xattrs = this.readXAttrs aid
|
||||
let pt0 = this.readPoolTable aid
|
||||
let n = p.Length
|
||||
|
||||
@@ -36,6 +36,7 @@
|
||||
<PackageReference Include="FSharp.Data" Version="6.6.0" />
|
||||
<PackageReference Include="FSharpPlus" Version="1.7.0" />
|
||||
<PackageReference Include="MessagePack" Version="3.1.4" />
|
||||
<PackageReference Include="MessagePack.FSharpExtensions" Version="4.0.0" />
|
||||
<PackageReference Include="Oceanbox.FvcomKit" Version="5.12.1" />
|
||||
<PackageReference Include="Oceanbox.GeoJson" Version="1.1.0" />
|
||||
<PackageReference Include="ProjNet.FSharp" Version="5.2.0" />
|
||||
|
||||
11
src/Wal.fs
11
src/Wal.fs
@@ -4,13 +4,10 @@ open System
|
||||
open MessagePack
|
||||
open BackingStore
|
||||
|
||||
type Frame = int
|
||||
type Pool = byte
|
||||
|
||||
[<MessagePackObject>]
|
||||
type WalOp =
|
||||
| Add of Pool
|
||||
| Remove of Pool
|
||||
| Add of PoolId
|
||||
| Remove of PoolId
|
||||
|
||||
[<MessagePackObject>]
|
||||
type WalDo =
|
||||
@@ -25,12 +22,12 @@ type Wal = {
|
||||
}
|
||||
|
||||
let readWal (store: IBackingStore) (name: string) =
|
||||
let key = StringKey(name, "wal")
|
||||
let key = FileKey(name, "wal")
|
||||
let bytes = store.readBinary key
|
||||
MessagePackSerializer.Deserialize<Wal[]> bytes
|
||||
|
||||
let appendWal (store: IBackingStore) (name: string) (wal: Wal)=
|
||||
let key = StringKey(name, "wal")
|
||||
let key = FileKey(name, "wal")
|
||||
let oldWal = readWal store name
|
||||
let newWal = Array.append oldWal [| wal |]
|
||||
let bytes = MessagePackSerializer.Serialize newWal
|
||||
|
||||
@@ -62,6 +62,16 @@
|
||||
"Microsoft.NET.StringTools": "17.11.4"
|
||||
}
|
||||
},
|
||||
"MessagePack.FSharpExtensions": {
|
||||
"type": "Direct",
|
||||
"requested": "[4.0.0, )",
|
||||
"resolved": "4.0.0",
|
||||
"contentHash": "JNc4ZmYifEi4PZVLRq9IMoO1RXmUDKj2mJtRCRXoz0SMx1veqabffWotDxsD6o0hVELSRQIKrEIcSjtwYAgsmQ==",
|
||||
"dependencies": {
|
||||
"FSharp.Core": "7.0.200",
|
||||
"MessagePack": "2.4.59"
|
||||
}
|
||||
},
|
||||
"Oceanbox.FvcomKit": {
|
||||
"type": "Direct",
|
||||
"requested": "[5.12.1, )",
|
||||
|
||||
Reference in New Issue
Block a user