.. _functionalities: ************************* Supported functionalities ************************* This section lists Opaque's supported functionalities, which is a subset of that of Spark SQL. The syntax for these functionalities is the same as Spark SQL -- Opaque simply replaces the execution to work with encrypted data. SQL interface ############# Data types ********** Out of the existing `Spark SQL types `_, Opaque supports - All numeric types. ``DecimalType`` is supported via conversion into ``FloatType`` - ``StringType`` - ``BinaryType`` - ``BooleanType`` - ``TimestampTime``, ``DateType`` - ``ArrayType``, ``MapType`` Functions ********* We currently support a subset of the Spark SQL functions, including both scalar and aggregate-like functions. - Scalar functions: ``case``, ``cast``, ``concat``, ``contains``, ``if``, ``in``, ``like``, ``substring``, ``upper`` - Aggregate functions: ``average``, ``count``, ``first``, ``last``, ``max``, ``min``, ``sum`` UDFs are not supported directly, but one can :ref:`extend Opaque with additional functions ` by writing it in C++. Operators ********* Opaque supports the core SQL operators: - Projection (e.g., ``SELECT`` statements) - Filter - Global aggregation and grouping aggregation - Order by, sort by - All join types except: cross join, full outer join, existence join - Limit DataFrame interface ################### Because Opaque SQL only replaces physical operators to work with encrypted data, the DataFrame interface is exactly the same as Spark's both for `Scala `_ and `Python `_. Opaque SQL is still a work in progress, so not all of these functionalities are currently implemented. See below for a complete list in Scala. Supported operations ******************** Actions ------- - `collect(): Array[T] `_ - `collectAsList(): List[T] `_ - `count(): Long `_ - `first(): T `_ - `foreach(func: ForeachFunction[T]): Unit `_ - `foreach(f: T => Unit): Unit `_ - `foreachPartition(func: ForeachPartitionFunction[T]): Unit `_ - `foreachPartition(f: Iterator[T] => Unit): Unit `_ - `head(): T `_ - `head(n: Int): Array[T] `_ - `show(numRows: Int, truncate: Int, vertical: Boolean): Unit `_ - `show(numRows: Int, truncate: Int): Unit `_ - `show(numRows: Int, truncate: Boolean): Unit `_ - `show(truncate: Boolean): Unit `_ - `show(): Unit `_ - `show(numRows: Int): Unit `_ - `take(n: Int): Array[T] `_ - `takeAsList(n: Int): List[T] `_ - `toLocalIterator(): Iterator[T] `_ Basic Dataset functions ----------------------- - `cache(): Dataset.this.type `_ - `columns: Array[String] `_ - `createGlobalTempView(viewName: String): Unit `_ - `createOrReplaceGlobalTempView(viewName: String): Unit `_ - `createOrReplaceTempView(viewName: String): Unit `_ - `createTempView(viewName: String): Unit `_ - `dtypes: Array[(String, String)] `_ - `explain(): Unit `_ - `explain(extended: Boolean): Unit `_ - `explain(mode: String): Unit `_ - `hint(name: String, parameters: Any*): Dataset[T] `_ - `inputFiles: Array[String] `_ - `isEmpty: Boolean `_ - `isLocal: Boolean `_ - `javaRDD: JavaRDD[T] `_ - `localCheckpoint(eager: Boolean): Dataset[T] `_ - `localCheckpoint(): Dataset[T] `_ - `printSchema(level: Int): Unit `_ - `printSchema(): Unit `_ - `rdd: org.apache.spark.rdd.RDD[T] `_ - `schema: types.StructType `_ - `storageLevel: org.apache.spark.storage.StorageLevel `_ - `toDF(colNames: String*): DataFrame `_ - `toDF(): DataFrame `_ - `toJavaRDD: JavaRDD[T] `_ - `unpersist(): Dataset.this.type `_ - `unpersist(blocking: Boolean): Dataset.this.type `_ - `write: DataFrameWriter[T] `_ - `writeStream: streaming.DataStreamWriter[T] `_ - `writeTo(table: String): DataFrameWriterV2[T] `_ - `registerTempTable(tableName: String): Unit `_ Streaming --------- - `isStreaming: Boolean `_ - `withWatermark(eventTime: String, delayThreshold: String): Dataset[T] `_ Typed transformations --------------------- - `alias(alias: Symbol): Dataset[T] `_ - `alias(alias: String): Dataset[T] `_ - `as(alias: Symbol): Dataset[T] `_ - `as(alias: String): Dataset[T] `_ - `coalesce(numPartitions: Int): Dataset[T] `_ - `distinct(): Dataset[T] `_ - `dropDuplicates(col1: String, cols: String*): Dataset[T] `_ - `dropDuplicates(colNames: Array[String]): Dataset[T] `_ - `dropDuplicates(colNames: Seq[String]): Dataset[T] `_ - `dropDuplicates(): Dataset[T] `_ - `filter(func: FilterFunction[T]): Dataset[T] `_ - `filter(func: T => Boolean): Dataset[T] `_ - `filter(conditionExpr: String): Dataset[T] `_ - `filter(condition: Column): Dataset[T] `_ - `flatMap[U](f: FlatMapFunction[T, U], encoder: Encoder[U]): Dataset[U] `_ - `flatMap[U](func: T => TraversableOnce[U])(implicitevidence: Encoder[U]): Dataset[U] `_ - `groupByKey[K](func: MapFunction[T, K], encoder: Encoder[K]): KeyValueGroupedDataset[K, T] `_ - `groupByKey[K](func: T => K)(implicitevidence: Encoder[K]): KeyValueGroupedDataset[K, T] `_ - `joinWith[U](other: Dataset[U], condition: Column): Dataset[(T, U)] `_ - `joinWith[U](other: Dataset[U], condition: Column, joinType: String): Dataset[(T, U)] `_ - `limit(n: Int): Dataset[T] `_ - `map[U](func: MapFunction[T, U], encoder: Encoder[U]): Dataset[U] `_ - `map[U](func: T => U)(implicitevidence: Encoder[U]): Dataset[U] `_ - `mapPartitions[U](f: MapPartitionsFunction[T, U], encoder: Encoder[U]): Dataset[U] `_ - `mapPartitions[U](func: Iterator[T] => Iterator[U])(implicitevidence: Encoder[U]): Dataset[U] `_ - `orderBy(sortExprs: Column*): Dataset[T] `_ - `orderBy(sortCol: String, sortCols: String*): Dataset[T] `_ - `randomSplit(weights: Array[Double]): Array[Dataset[T]] `_ - `randomSplit(weights: Array[Double], seed: Long): Array[Dataset[T]] `_ - `randomSplitAsList(weights: Array[Double], seed: Long): List[Dataset[T]] `_ - `repartition(partitionExprs: Column*): Dataset[T] `_ - `repartition(numPartitions: Int, partitionExprs: Column*): Dataset[T] `_ - `repartition(numPartitions: Int): Dataset[T] `_ - `repartitionByRange(partitionExprs: Column*): Dataset[T] `_ - `repartitionByRange(numPartitions: Int, partitionExprs: Column*): Dataset[T] `_ - `select[U1, U2, U3, U4, U5](c1: TypedColumn[T, U1], c2: TypedColumn[T, U2], c3: TypedColumn[T, U3], c4: TypedColumn[T, U4], c5: TypedColumn[T, U5]): Dataset[(U1, U2, U3, U4, U5)] `_ - `select[U1, U2, U3, U4](c1: TypedColumn[T, U1], c2: TypedColumn[T, U2], c3: TypedColumn[T, U3], c4: TypedColumn[T, U4]): Dataset[(U1, U2, U3, U4)] `_ - `select[U1, U2, U3](c1: TypedColumn[T, U1], c2: TypedColumn[T, U2], c3: TypedColumn[T, U3]): Dataset[(U1, U2, U3)] `_ - `select[U1, U2](c1: TypedColumn[T, U1], c2: TypedColumn[T, U2]): Dataset[(U1, U2)] `_ - `select[U1](c1: TypedColumn[T, U1]): Dataset[U1] `_ - `sort(sortExprs: Column*): Dataset[T] `_ - `sort(sortCol: String, sortCols: String*): Dataset[T] `_ - `sortWithinPartitions(sortExprs: Column*): Dataset[T] `_ - `sortWithinPartitions(sortCol: String, sortCols: String*): Dataset[T] `_ - `transform[U](t: Dataset[T] => Dataset[U]): Dataset[U] `_ - `union(other: Dataset[T]): Dataset[T] `_ - `unionAll(other: Dataset[T]): Dataset[T] `_ - `unionByName(other: Dataset[T], allowMissingColumns: Boolean): Dataset[T] `_ - `unionByName(other: Dataset[T]): Dataset[T] `_ - `where(conditionExpr: String): Dataset[T] `_ - `where(condition: Column): Dataset[T] `_ Untyped transformations ----------------------- - `agg(expr: Column, exprs: Column*): DataFrame `_ - `agg(exprs: Map[String, String]): DataFrame `_ - `agg(aggExpr: (String, String), aggExprs: (String, String)*): DataFrame `_ - `apply(colName: String): Column `_ - `col(colName: String): Column `_ - `colRegex(colName: String): Column `_ - `drop(col: Column): DataFrame `_ - `drop(colNames: String*): DataFrame `_ - `drop(colName: String): DataFrame `_ - `groupBy(col1: String, cols: String*): RelationalGroupedDataset `_ - `groupBy(cols: Column*): RelationalGroupedDataset `_ - `hashCode(): Int `_ - `join(right: Dataset[_], joinExprs: Column, joinType: String): DataFrame `_ - `join(right: Dataset[_], joinExprs: Column): DataFrame `_ - `join(right: Dataset[_], usingColumns: Seq[String], joinType: String): DataFrame `_ - `join(right: Dataset[_], usingColumns: Seq[String]): DataFrame `_ - `join(right: Dataset[_], usingColumn: String): DataFrame `_ - `join(right: Dataset[_]): DataFrame `_ - `na: DataFrameNaFunctions `_ - `select(col: String, cols: String*): DataFrame `_ - `select(cols: Column*): DataFrame `_ - `selectExpr(exprs: String*): DataFrame `_ - `stat: DataFrameStatFunctions `_ - `withColumn(colName: String, col: Column): DataFrame `_ - `withColumnRenamed(existingName: String, newName: String): DataFrame `_ Ungrouped --------- - `encoder: Encoder[T] `_ - `queryExecution: execution.QueryExecution `_ - `sameSemantics(other: Dataset[T]): Boolean `_ - `semanticHash(): Int `_ - `sparkSession: SparkSession `_ - `sqlContext: SQLContext `_ - `toJSON: Dataset[String] `_ - `toString(): String `_ Unsupported operations ********************** Actions ------- - `describe(cols: String*): DataFrame `_ - `reduce(func: ReduceFunction[T]): T `_ - `reduce(func: (T, T) => T): T `_ - `summary(statistics: String*): DataFrame `_ - `tail(n: Int): Array[T] `_ Basic Dataset Functions ----------------------- - `as[U](implicitevidence: Encoder[U]): Dataset[U] `_ - `checkpoint(eager: Boolean): Dataset[T] `_ - `checkpoint(): Dataset[T] `_ - `persist(newLevel: org.apache.spark.storage.StorageLevel): Dataset.this.type `_ - `persist(): Dataset.this.type `_ Typed transformations --------------------- - `except(other: Dataset[T]): Dataset[T] `_ - `exceptAll(other: Dataset[T]): Dataset[T] `_ - `intersect(other: Dataset[T]): Dataset[T] `_ - `intersectAll(other: Dataset[T]): Dataset[T] `_ - `observe(name: String, expr: Column, exprs: Column*): Dataset[T] `_ - `sample(withReplacement: Boolean, fraction: Double): Dataset[T] `_ - `sample(withReplacement: Boolean, fraction: Double, seed: Long): Dataset[T] `_ - `sample(fraction: Double): Dataset[T] `_ - `sample(fraction: Double, seed: Long): Dataset[T] `_ Untyped transformations ----------------------- - `crossJoin(right: Dataset[_]): DataFrame `_ - `cube(col1: String, cols: String*): RelationalGroupedDataset `_ - `cube(cols: Column*): RelationalGroupedDataset `_ - `rollup(col1: String, cols: String*): RelationalGroupedDataset `_ - `rollup(cols: Column*): RelationalGroupedDataset `_ - `explode[A, B](inputColumn: String, outputColumn: String)(f: A => TraversableOnce[B])(implicitevidence: reflect.runtime.universe.TypeTag[B]): DataFrame `_ - `explode[A <: Product](input: Column*)(f: Row => TraversableOnce[A])(implicitevidence: reflect.runtime.universe.TypeTag[A]): DataFrame `_ `*` Cross joins and full outer joins are not supported. Aggregations with more than one distinct aggregate expression are not supported. .. _udfs: User-Defined Functions (UDFs) ############################# To run a Spark SQL UDF within Opaque enclaves, first name it explicitly and define it in Scala, then reimplement it in C++ against Opaque's serialized row representation. For example, suppose we wish to implement a UDF called ``dot``, which computes the dot product of two double arrays (``Array[Double]``). We [define it in Scala](src/main/scala/edu/berkeley/cs/rise/opaque/expressions/DotProduct.scala) in terms of the Breeze linear algebra library's implementation. We can then use it in a DataFrame query, such as `logistic regression `_. Now we can port this UDF to Opaque as follows: 1. Define a corresponding expression using Opaque's expression serialization format by adding the following to [Expr.fbs](src/flatbuffers/Expr.fbs), which indicates that a DotProduct expression takes two inputs (the two double arrays): .. code-block:: protobuf table DotProduct { left:Expr; right:Expr; } In the same file, add ``DotProduct`` to the list of expressions in ``ExprUnion``. 2. Implement the serialization logic from the Scala ``DotProduct`` UDF to the Opaque expression that we just defined. In ``Utils.flatbuffersSerializeExpression`` (from ``Utils.scala``), add a case for ``DotProduct`` as follows: .. code-block:: scala case (DotProduct(left, right), Seq(leftOffset, rightOffset)) => tuix.Expr.createExpr( builder, tuix.ExprUnion.DotProduct, tuix.DotProduct.createDotProduct( builder, leftOffset, rightOffset)) 3. Finally, implement the UDF in C++. In ``FlatbuffersExpressionEvaluator#eval_helper`` (from ``expression_evaluation.h``), add a case for ``tuix::ExprUnion_DotProduct``. Within that case, cast the expression to a ``tuix::DotProduct``, recursively evaluate the left and right children, perform the dot product computation on them, and construct a ``DoubleField`` containing the result.